This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bcec7d711591bdbafe9ec460e68e0f609740b9d4
Author: Gao Yun <yungao...@alibaba-inc.com>
AuthorDate: Mon Jun 17 08:22:41 2019 +0800

    [FLINK-12765][jobmanager] Keep track of the resources used in the task slot 
hierarchy
---
 .../clusterframework/types/ResourceProfile.java    |  4 ++
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  1 +
 .../jobmaster/slotpool/SlotSharingManager.java     | 49 +++++++++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 72 ++++++++++++++++++++++
 4 files changed, 126 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 7ac1dd4..c373fbd 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -51,11 +51,15 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
 
        private static final long serialVersionUID = 1L;
 
+       /** A ResourceProfile that indicates an unknown set of resources. */
        public static final ResourceProfile UNKNOWN = new ResourceProfile();
 
        /** ResourceProfile which matches any other ResourceProfile. */
        public static final ResourceProfile ANY = new 
ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 
Collections.emptyMap());
 
+       /** A ResourceProfile describing zero resources. */
+       public static final ResourceProfile ZERO = new ResourceProfile(0, 0);
+
        // 
------------------------------------------------------------------------
 
        /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index b8d8540..57c42b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -320,6 +320,7 @@ public class SchedulerImpl implements Scheduler {
 
                final SlotSharingManager.SingleTaskSlot leaf = 
multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
                        slotRequestId,
+                       slotProfile.getResourceProfile(),
                        scheduledUnit.getJobVertexId(),
                        multiTaskSlotLocality.getLocality());
                return leaf.getLogicalSlotFuture();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index f5d0dec..a7afbac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -293,6 +294,11 @@ public class SlotSharingManager {
                 * @param cause for the release
                 */
                public abstract void release(Throwable cause);
+
+               /**
+                * Gets the total reserved resources of the slot and its 
descendants.
+                */
+               public abstract ResourceProfile getReservedResources();
        }
 
        /**
@@ -316,6 +322,9 @@ public class SlotSharingManager {
                // true if we are currently releasing our children
                private boolean releasingChildren;
 
+               // the total resources reserved by all the descendants.
+               private ResourceProfile reservedResources;
+
                private MultiTaskSlot(
                                SlotRequestId slotRequestId,
                                AbstractID groupId,
@@ -355,6 +364,8 @@ public class SlotSharingManager {
                        this.children = new HashMap<>(16);
                        this.releasingChildren = false;
 
+                       this.reservedResources = ResourceProfile.ZERO;
+
                        slotContextFuture.whenComplete(
                                (SlotContext ignored, Throwable throwable) -> {
                                        if (throwable != null) {
@@ -404,6 +415,7 @@ public class SlotSharingManager {
                 */
                SingleTaskSlot allocateSingleTaskSlot(
                                SlotRequestId slotRequestId,
+                               ResourceProfile resourceProfile,
                                AbstractID groupId,
                                Locality locality) {
                        Preconditions.checkState(!super.contains(groupId));
@@ -412,6 +424,7 @@ public class SlotSharingManager {
 
                        final SingleTaskSlot leaf = new SingleTaskSlot(
                                slotRequestId,
+                               resourceProfile,
                                groupId,
                                this,
                                locality);
@@ -421,6 +434,8 @@ public class SlotSharingManager {
                        // register the newly allocated slot also at the 
SlotSharingManager
                        allTaskSlots.put(slotRequestId, leaf);
 
+                       reserveResource(resourceProfile);
+
                        return leaf;
                }
 
@@ -490,6 +505,11 @@ public class SlotSharingManager {
                        }
                }
 
+               @Override
+               public ResourceProfile getReservedResources() {
+                       return reservedResources;
+               }
+
                /**
                 * Releases the child with the given childGroupId.
                 *
@@ -501,6 +521,9 @@ public class SlotSharingManager {
 
                                if (child != null) {
                                        
allTaskSlots.remove(child.getSlotRequestId());
+
+                                       // Update the resources of this slot 
and the parents
+                                       
releaseResource(child.getReservedResources());
                                }
 
                                if (children.isEmpty()) {
@@ -509,6 +532,22 @@ public class SlotSharingManager {
                        }
                }
 
+               private void reserveResource(ResourceProfile resourceProfile) {
+                       reservedResources = 
reservedResources.merge(resourceProfile);
+
+                       if (parent != null) {
+                               parent.reserveResource(resourceProfile);
+                       }
+               }
+
+               private void releaseResource(ResourceProfile resourceProfile) {
+                       reservedResources = 
reservedResources.subtract(resourceProfile);
+
+                       if (parent != null) {
+                               parent.releaseResource(resourceProfile);
+                       }
+               }
+
                @Override
                public String toString() {
                        String physicalSlotDescription;
@@ -539,13 +578,18 @@ public class SlotSharingManager {
                // future containing a LogicalSlot which is completed once the 
underlying SlotContext future is completed
                private final CompletableFuture<SingleLogicalSlot> 
singleLogicalSlotFuture;
 
+               // the resource profile of this slot.
+               private final ResourceProfile resourceProfile;
+
                private SingleTaskSlot(
                                SlotRequestId slotRequestId,
+                               ResourceProfile resourceProfile,
                                AbstractID groupId,
                                MultiTaskSlot parent,
                                Locality locality) {
                        super(slotRequestId, groupId);
 
+                       this.resourceProfile = 
Preconditions.checkNotNull(resourceProfile);
                        this.parent = Preconditions.checkNotNull(parent);
 
                        Preconditions.checkNotNull(locality);
@@ -581,6 +625,11 @@ public class SlotSharingManager {
                }
 
                @Override
+               public ResourceProfile getReservedResources() {
+                       return resourceProfile;
+               }
+
+               @Override
                public String toString() {
                        String logicalSlotString = "(pending)";
                        try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 8e8db1a..0742ac2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.api.common.resources.GPUResource;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
@@ -135,6 +136,7 @@ public class SlotSharingManagerTest extends TestLogger {
                SlotRequestId singleTaskSlotRequestId = new SlotRequestId();
                SlotSharingManager.SingleTaskSlot singleTaskSlot = 
rootSlot.allocateSingleTaskSlot(
                        singleTaskSlotRequestId,
+                       ResourceProfile.UNKNOWN,
                        singleTaskSlotGroupId,
                        Locality.LOCAL);
 
@@ -180,6 +182,7 @@ public class SlotSharingManagerTest extends TestLogger {
                SlotRequestId singleTaskSlotRequestId = new SlotRequestId();
                SlotSharingManager.SingleTaskSlot singleTaskSlot = 
rootSlot.allocateSingleTaskSlot(
                        singleTaskSlotRequestId,
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        Locality.LOCAL);
 
@@ -237,6 +240,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
                SlotSharingManager.SingleTaskSlot singleTaskSlot1 = 
multiTaskSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        Locality.LOCAL);
 
@@ -284,12 +288,14 @@ public class SlotSharingManagerTest extends TestLogger {
                Locality locality1 = Locality.LOCAL;
                SlotSharingManager.SingleTaskSlot singleTaskSlot1 = 
rootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        locality1);
 
                Locality locality2 = Locality.HOST_LOCAL;
                SlotSharingManager.SingleTaskSlot singleTaskSlot2 = 
rootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        locality2);
 
@@ -314,6 +320,7 @@ public class SlotSharingManagerTest extends TestLogger {
                Locality locality3 = Locality.NON_LOCAL;
                SlotSharingManager.SingleTaskSlot singleTaskSlot3 = 
rootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        locality3);
 
@@ -349,6 +356,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
                SlotSharingManager.SingleTaskSlot singleTaskSlot = 
rootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        new AbstractID(),
                        Locality.LOCAL);
 
@@ -435,6 +443,7 @@ public class SlotSharingManagerTest extends TestLogger {
                // occupy the resolved root slot
                resolvedMultiTaskSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        groupId,
                        Locality.UNCONSTRAINED);
 
@@ -491,6 +500,7 @@ public class SlotSharingManagerTest extends TestLogger {
                // occupy the slot
                resolvedRootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        groupId,
                        slotInfoAndLocality.getLocality());
 
@@ -525,6 +535,7 @@ public class SlotSharingManagerTest extends TestLogger {
                // occupy the unresolved slot
                unresolvedRootSlot.allocateSingleTaskSlot(
                        new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
                        groupId,
                        Locality.UNKNOWN);
 
@@ -533,4 +544,65 @@ public class SlotSharingManagerTest extends TestLogger {
                // we should no longer have a free unresolved root slot
                assertNull(unresolvedRootSlot1);
        }
+
+       @Test
+       public void testResourceCalculationOnSlotAllocatingAndReleasing() {
+               ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 
100, 100, Collections.emptyMap());
+               ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 
200, 200, Collections.singletonMap("gpu", new GPUResource(2.0)));
+               ResourceProfile rp3 = new ResourceProfile(3.0, 300, 300, 300, 
300, 300, Collections.singletonMap("gpu", new GPUResource(3.0)));
+
+               final TestingAllocatedSlotActions allocatedSlotActions = new 
TestingAllocatedSlotActions();
+
+               SlotSharingManager slotSharingManager = new SlotSharingManager(
+                               SLOT_SHARING_GROUP_ID,
+                               allocatedSlotActions,
+                               SLOT_OWNER);
+
+               SlotSharingManager.MultiTaskSlot unresolvedRootSlot = 
slotSharingManager.createRootSlot(
+                               new SlotRequestId(),
+                               new CompletableFuture<>(),
+                               new SlotRequestId());
+
+               // Allocates the left subtree.
+               SlotSharingManager.MultiTaskSlot leftMultiTaskSlot =
+                               unresolvedRootSlot.allocateMultiTaskSlot(new 
SlotRequestId(), new SlotSharingGroupId());
+
+               SlotSharingManager.SingleTaskSlot firstChild = 
leftMultiTaskSlot.allocateSingleTaskSlot(
+                               new SlotRequestId(),
+                               rp1,
+                               new SlotSharingGroupId(),
+                               Locality.LOCAL);
+               SlotSharingManager.SingleTaskSlot secondChild = 
leftMultiTaskSlot.allocateSingleTaskSlot(
+                               new SlotRequestId(),
+                               rp2,
+                               new SlotSharingGroupId(),
+                               Locality.LOCAL);
+
+               assertEquals(rp1, firstChild.getReservedResources());
+               assertEquals(rp2, secondChild.getReservedResources());
+               assertEquals(rp1.merge(rp2), 
leftMultiTaskSlot.getReservedResources());
+               assertEquals(rp1.merge(rp2), 
unresolvedRootSlot.getReservedResources());
+
+               // Allocates the right subtree.
+               SlotSharingManager.SingleTaskSlot thirdChild = 
unresolvedRootSlot.allocateSingleTaskSlot(
+                               new SlotRequestId(),
+                               rp3,
+                               new SlotSharingGroupId(),
+                               Locality.LOCAL);
+               assertEquals(rp3, thirdChild.getReservedResources());
+               assertEquals(rp1.merge(rp2).merge(rp3), 
unresolvedRootSlot.getReservedResources());
+
+               // Releases the second child in the left-side tree.
+               secondChild.release(new Throwable("Release for testing"));
+               assertEquals(rp1, leftMultiTaskSlot.getReservedResources());
+               assertEquals(rp1.merge(rp3), 
unresolvedRootSlot.getReservedResources());
+
+               // Releases the third child in the right-side tree.
+               thirdChild.release(new Throwable("Release for testing"));
+               assertEquals(rp1, unresolvedRootSlot.getReservedResources());
+
+               // Releases the first child in the left-side tree.
+               firstChild.release(new Throwable("Release for testing"));
+               assertEquals(ResourceProfile.ZERO, 
unresolvedRootSlot.getReservedResources());
+       }
 }

Reply via email to