tillrohrmann commented on a change in pull request #14897:
URL: https://github.com/apache/flink/pull/14897#discussion_r578344017



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -600,61 +603,69 @@ private void allocateSlot(
         FutureUtils.assertNoException(slotAllocationResponseProcessingFuture);
     }
 
-    private void tryFulfillRequirementsWithPendingSlots(
+    private ResourceCounter tryFulfillRequirementsWithPendingSlots(
             JobID jobId,
-            Map<ResourceProfile, Integer> missingResources,
+            Collection<Map.Entry<ResourceProfile, Integer>> missingResources,
             ResourceCounter pendingSlots) {
-        for (Map.Entry<ResourceProfile, Integer> missingResource : 
missingResources.entrySet()) {
+        for (Map.Entry<ResourceProfile, Integer> missingResource : 
missingResources) {
             ResourceProfile profile = missingResource.getKey();
             for (int i = 0; i < missingResource.getValue(); i++) {
-                if (!tryFulfillWithPendingSlots(profile, pendingSlots)) {
-                    boolean couldAllocateWorkerAndReserveSlot =
+                final Tuple2<Boolean, ResourceCounter> matchingResult =
+                        tryFulfillWithPendingSlots(profile, pendingSlots);
+                pendingSlots = matchingResult.f1;
+                if (!matchingResult.f0) {
+                    final Tuple2<Boolean, ResourceCounter> allocationResult =
                             tryAllocateWorkerAndReserveSlot(profile, 
pendingSlots);
+                    pendingSlots = allocationResult.f1;
+                    boolean couldAllocateWorkerAndReserveSlot = 
allocationResult.f0;
                     if (!couldAllocateWorkerAndReserveSlot && 
sendNotEnoughResourceNotifications) {
                         LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
                         resourceActions.notifyNotEnoughResourcesAvailable(
                                 jobId, 
resourceTracker.getAcquiredResources(jobId));
-                        return;
+                        return pendingSlots;
                     }
                 }
             }
         }
+        return pendingSlots;
     }
 
-    private boolean tryFulfillWithPendingSlots(
+    private Tuple2<Boolean, ResourceCounter> tryFulfillWithPendingSlots(

Review comment:
       I would recommend using an explicit type. Something like 
`MatchingResult` which tells you whether the matching was successful or not. In 
the former case, it also gives you the newly available pendingSlots back.
   
   The problem with `Tuple2` is that it is not very expressive. You only have 
`Tuple2.f0` and `Tuple2.f1`. This is not very helpful when someone new to the 
code wants to understand the code. Having something like `MatchingResult` with 
`MatchingResult.isSuccess` or `MatchingResult.isSuccessfulMatching` tells you 
more about the what the intention is.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceCounter.java
##########
@@ -53,6 +53,16 @@ public int getResourceCount(ResourceProfile resourceProfile) 
{
         return resources.getOrDefault(resourceProfile, 0);
     }
 
+    /**
+     * Adds increment to this resource counter value and returns the resulting 
value. Number of
+     * resources of all the {@link ResourceProfile} in this counter.
+     *
+     * @return number of resources of all the {@link ResourceProfile} in this 
counter
+     */
+    public int getResourceCount() {

Review comment:
       Maybe call it `getTotalResourceCount`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMapping.java
##########
@@ -74,23 +81,26 @@ private static void internalDecrementCount(
                             "Attempting to decrement count of %s->%s, but 
primary key was unknown.",
                             resourceProfile,
                             secondaryKey);
-                    resourceCounter.decrementCount(secondaryKey, decrement);
-                    return resourceCounter.isEmpty() ? null : resourceCounter;
+                    final ResourceCounter newCounter =
+                            resourceCounter.subtract(secondaryKey, decrement);
+                    return newCounter.isEmpty() ? null : newCounter;
                 });
     }
 
-    public Map<ResourceProfile, Integer> 
getResourcesFulfilling(ResourceProfile requirement) {
+    public Collection<Map.Entry<ResourceProfile, Integer>> 
getResourcesFulfilling(

Review comment:
       Instead of returning `Collection<Map.Entry<ResourceProfile, Integer>>` 
couldn't we also return a `ResourceCounter`? Given that this class is immutable 
it should not be a problem to hand it out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMapping.java
##########
@@ -74,23 +81,26 @@ private static void internalDecrementCount(
                             "Attempting to decrement count of %s->%s, but 
primary key was unknown.",
                             resourceProfile,
                             secondaryKey);
-                    resourceCounter.decrementCount(secondaryKey, decrement);
-                    return resourceCounter.isEmpty() ? null : resourceCounter;
+                    final ResourceCounter newCounter =
+                            resourceCounter.subtract(secondaryKey, decrement);
+                    return newCounter.isEmpty() ? null : newCounter;
                 });
     }
 
-    public Map<ResourceProfile, Integer> 
getResourcesFulfilling(ResourceProfile requirement) {
+    public Collection<Map.Entry<ResourceProfile, Integer>> 
getResourcesFulfilling(
+            ResourceProfile requirement) {
         Preconditions.checkNotNull(requirement);
         return requirementToFulfillingResources
-                .getOrDefault(requirement, ResourceCounter.EMPTY)
-                .getResourceProfilesWithCount();
+                .getOrDefault(requirement, ResourceCounter.empty())
+                .getResourcesWithCount();
     }
 
-    public Map<ResourceProfile, Integer> 
getRequirementsFulfilledBy(ResourceProfile resource) {
+    public Collection<Map.Entry<ResourceProfile, Integer>> 
getRequirementsFulfilledBy(

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceCounter.java
##########
@@ -53,6 +53,16 @@ public int getResourceCount(ResourceProfile resourceProfile) 
{
         return resources.getOrDefault(resourceProfile, 0);
     }
 
+    /**
+     * Adds increment to this resource counter value and returns the resulting 
value. Number of
+     * resources of all the {@link ResourceProfile} in this counter.

Review comment:
       ```suggestion
        * Computes the total number of resources in this counter.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -600,61 +603,69 @@ private void allocateSlot(
         FutureUtils.assertNoException(slotAllocationResponseProcessingFuture);
     }
 
-    private void tryFulfillRequirementsWithPendingSlots(
+    private ResourceCounter tryFulfillRequirementsWithPendingSlots(
             JobID jobId,
-            Map<ResourceProfile, Integer> missingResources,
+            Collection<Map.Entry<ResourceProfile, Integer>> missingResources,
             ResourceCounter pendingSlots) {
-        for (Map.Entry<ResourceProfile, Integer> missingResource : 
missingResources.entrySet()) {
+        for (Map.Entry<ResourceProfile, Integer> missingResource : 
missingResources) {
             ResourceProfile profile = missingResource.getKey();
             for (int i = 0; i < missingResource.getValue(); i++) {
-                if (!tryFulfillWithPendingSlots(profile, pendingSlots)) {
-                    boolean couldAllocateWorkerAndReserveSlot =
+                final Tuple2<Boolean, ResourceCounter> matchingResult =
+                        tryFulfillWithPendingSlots(profile, pendingSlots);
+                pendingSlots = matchingResult.f1;
+                if (!matchingResult.f0) {
+                    final Tuple2<Boolean, ResourceCounter> allocationResult =
                             tryAllocateWorkerAndReserveSlot(profile, 
pendingSlots);
+                    pendingSlots = allocationResult.f1;
+                    boolean couldAllocateWorkerAndReserveSlot = 
allocationResult.f0;
                     if (!couldAllocateWorkerAndReserveSlot && 
sendNotEnoughResourceNotifications) {
                         LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
                         resourceActions.notifyNotEnoughResourcesAvailable(
                                 jobId, 
resourceTracker.getAcquiredResources(jobId));
-                        return;
+                        return pendingSlots;
                     }
                 }
             }
         }
+        return pendingSlots;
     }
 
-    private boolean tryFulfillWithPendingSlots(
+    private Tuple2<Boolean, ResourceCounter> tryFulfillWithPendingSlots(
             ResourceProfile resourceProfile, ResourceCounter pendingSlots) {
-        Set<ResourceProfile> pendingSlotProfiles = 
pendingSlots.getResourceProfiles();
+        Set<ResourceProfile> pendingSlotProfiles = pendingSlots.getResources();
 
         // short-cut, pretty much only applicable to fine-grained resource 
management
         if (pendingSlotProfiles.contains(resourceProfile)) {
-            pendingSlots.decrementCount(resourceProfile, 1);
-            return true;
+            pendingSlots = pendingSlots.subtract(resourceProfile, 1);
+            return Tuple2.of(true, pendingSlots);
         }
 
         for (ResourceProfile pendingSlotProfile : pendingSlotProfiles) {
             if (pendingSlotProfile.isMatching(resourceProfile)) {
-                pendingSlots.decrementCount(pendingSlotProfile, 1);
-                return true;
+                pendingSlots = pendingSlots.subtract(pendingSlotProfile, 1);
+                return Tuple2.of(true, pendingSlots);
             }
         }
 
-        return false;
+        return Tuple2.of(false, pendingSlots);
     }
 
-    private boolean tryAllocateWorkerAndReserveSlot(
+    private Tuple2<Boolean, ResourceCounter> tryAllocateWorkerAndReserveSlot(

Review comment:
       Same here. I would suggest to introduce something like 
`WorkerAllocationResult` or so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to