This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d676a355e910966fc878bde62d73c1721c79942
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Mar 27 13:21:15 2023 +0200

    [FLINK-31399] Move desiredResources out of WaitingForResources
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 27 ++++++---
 .../scheduler/adaptive/WaitingForResources.java    | 18 +-----
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 67 +++++++++-------------
 .../adaptive/WaitingForResourcesTest.java          | 49 ++++------------
 .../scheduler/adaptive/allocator/TestSlotInfo.java | 16 +++++-
 5 files changed, 74 insertions(+), 103 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index c028fd05ecf..e3499f231bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -217,6 +217,7 @@ public class AdaptiveScheduler
     private final DeploymentStateTimeMetrics deploymentTimeMetrics;
 
     private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
+    private ResourceCounter desiredResources = ResourceCounter.empty();
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
@@ -734,12 +735,17 @@ public class AdaptiveScheduler
     // ----------------------------------------------------------------
 
     @Override
-    public boolean hasDesiredResources(ResourceCounter desiredResources) {
-        final Collection<? extends SlotInfo> allSlots =
+    public boolean hasDesiredResources() {
+        final Collection<? extends SlotInfo> freeSlots =
                 declarativeSlotPool.getFreeSlotsInformation();
-        ResourceCounter outstandingResources = desiredResources;
+        return hasDesiredResources(desiredResources, freeSlots);
+    }
 
-        final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+    @VisibleForTesting
+    static boolean hasDesiredResources(
+            ResourceCounter desiredResources, Collection<? extends SlotInfo> 
freeSlots) {
+        ResourceCounter outstandingResources = desiredResources;
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
 
         while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
             final SlotInfo slotInfo = slotIterator.next();
@@ -791,19 +797,26 @@ public class AdaptiveScheduler
 
     @Override
     public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
-        final ResourceCounter desiredResources = calculateDesiredResources();
-        declarativeSlotPool.setResourceRequirements(desiredResources);
+        declareDesiredResources();
 
         transitionToState(
                 new WaitingForResources.Factory(
                         this,
                         LOG,
-                        desiredResources,
                         this.initialResourceAllocationTimeout,
                         this.resourceStabilizationTimeout,
                         previousExecutionGraph));
     }
 
+    private void declareDesiredResources() {
+        final ResourceCounter newDesiredResources = 
calculateDesiredResources();
+
+        if (!newDesiredResources.equals(this.desiredResources)) {
+            this.desiredResources = newDesiredResources;
+            declarativeSlotPool.setResourceRequirements(this.desiredResources);
+        }
+    }
+
     private ResourceCounter calculateDesiredResources() {
         return 
slotAllocator.calculateRequiredSlots(jobInformation.getVertices());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 62b20919ee1..b15fc25776b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
@@ -44,7 +43,6 @@ class WaitingForResources implements State, ResourceListener {
 
     private final Logger log;
 
-    private final ResourceCounter desiredResources;
     private final Clock clock;
 
     /** If set, there's an ongoing deadline waiting for a resource 
stabilization. */
@@ -60,13 +58,11 @@ class WaitingForResources implements State, 
ResourceListener {
     WaitingForResources(
             Context context,
             Logger log,
-            ResourceCounter desiredResources,
             Duration initialResourceAllocationTimeout,
             Duration resourceStabilizationTimeout) {
         this(
                 context,
                 log,
-                desiredResources,
                 initialResourceAllocationTimeout,
                 resourceStabilizationTimeout,
                 SystemClock.getInstance(),
@@ -76,22 +72,17 @@ class WaitingForResources implements State, 
ResourceListener {
     WaitingForResources(
             Context context,
             Logger log,
-            ResourceCounter desiredResources,
             Duration initialResourceAllocationTimeout,
             Duration resourceStabilizationTimeout,
             Clock clock,
             @Nullable ExecutionGraph previousExecutionGraph) {
         this.context = Preconditions.checkNotNull(context);
         this.log = Preconditions.checkNotNull(log);
-        this.desiredResources = Preconditions.checkNotNull(desiredResources);
         this.resourceStabilizationTimeout =
                 Preconditions.checkNotNull(resourceStabilizationTimeout);
         this.clock = clock;
         Preconditions.checkNotNull(initialResourceAllocationTimeout);
 
-        Preconditions.checkArgument(
-                !desiredResources.isEmpty(), "Desired resources must not be 
empty");
-
         Preconditions.checkArgument(
                 !resourceStabilizationTimeout.isNegative(),
                 "Resource stabilization timeout must not be negative");
@@ -154,7 +145,7 @@ class WaitingForResources implements State, 
ResourceListener {
     }
 
     private void checkDesiredOrSufficientResourcesAvailable() {
-        if (context.hasDesiredResources(desiredResources)) {
+        if (context.hasDesiredResources()) {
             createExecutionGraphWithAvailableResources();
             return;
         }
@@ -207,10 +198,9 @@ class WaitingForResources implements State, 
ResourceListener {
         /**
          * Checks whether we have the desired resources.
          *
-         * @param desiredResources desiredResources describing the desired 
resources
          * @return {@code true} if we have enough resources; otherwise {@code 
false}
          */
-        boolean hasDesiredResources(ResourceCounter desiredResources);
+        boolean hasDesiredResources();
 
         /**
          * Checks if we currently have sufficient resources for executing the 
job.
@@ -235,7 +225,6 @@ class WaitingForResources implements State, 
ResourceListener {
 
         private final Context context;
         private final Logger log;
-        private final ResourceCounter desiredResources;
         private final Duration initialResourceAllocationTimeout;
         private final Duration resourceStabilizationTimeout;
         @Nullable private final ExecutionGraph previousExecutionGraph;
@@ -243,13 +232,11 @@ class WaitingForResources implements State, 
ResourceListener {
         public Factory(
                 Context context,
                 Logger log,
-                ResourceCounter desiredResources,
                 Duration initialResourceAllocationTimeout,
                 Duration resourceStabilizationTimeout,
                 ExecutionGraph previousExecutionGraph) {
             this.context = context;
             this.log = log;
-            this.desiredResources = desiredResources;
             this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
             this.resourceStabilizationTimeout = resourceStabilizationTimeout;
             this.previousExecutionGraph = previousExecutionGraph;
@@ -263,7 +250,6 @@ class WaitingForResources implements State, 
ResourceListener {
             return new WaitingForResources(
                     context,
                     log,
-                    desiredResources,
                     initialResourceAllocationTimeout,
                     resourceStabilizationTimeout,
                     SystemClock.getInstance(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index de5126c1175..e9fa605e09b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
@@ -108,7 +109,10 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -217,56 +221,40 @@ public class AdaptiveSchedulerTest {
     }
 
     @Test
-    void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
-        final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor)
-                        .build(EXECUTOR_RESOURCE.getExecutor());
-
-        scheduler.startScheduling();
-
+    void testHasEnoughResourcesReturnsFalseIfUnsatisfied() {
         final ResourceCounter resourceRequirement =
                 ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
-
-        
assertThat(scheduler.hasDesiredResources(resourceRequirement)).isFalse();
+        assertThat(
+                        AdaptiveScheduler.hasDesiredResources(
+                                resourceRequirement, Collections.emptyList()))
+                .isFalse();
     }
 
     @Test
-    void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
-        final JobGraph jobGraph = createJobGraph();
-
-        final DefaultDeclarativeSlotPool declarativeSlotPool =
-                createDeclarativeSlotPool(jobGraph.getJobID());
-
-        final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .build(EXECUTOR_RESOURCE.getExecutor());
-
-        scheduler.startScheduling();
-
+    void testHasEnoughResourcesReturnsTrueIfSatisfied() {
         final ResourceCounter resourceRequirement =
                 ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+        final Collection<TestSlotInfo> freeSlots =
+                createSlotInfosForResourceRequirements(resourceRequirement);
+        assertThat(AdaptiveScheduler.hasDesiredResources(resourceRequirement, 
freeSlots)).isTrue();
+    }
 
-        offerSlots(
-                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+    private Collection<TestSlotInfo> createSlotInfosForResourceRequirements(
+            ResourceCounter resourceRequirements) {
+        final Collection<TestSlotInfo> slotInfos = new ArrayList<>();
 
-        
assertThat(scheduler.hasDesiredResources(resourceRequirement)).isTrue();
+        for (Map.Entry<ResourceProfile, Integer> resourceProfileCount :
+                resourceRequirements.getResourcesWithCount()) {
+            for (int i = 0; i < resourceProfileCount.getValue(); i++) {
+                slotInfos.add(new TestSlotInfo(resourceProfileCount.getKey()));
+            }
+        }
+
+        return slotInfos;
     }
 
     @Test
     void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
-        final JobGraph jobGraph = createJobGraph();
-
-        final DefaultDeclarativeSlotPool declarativeSlotPool =
-                createDeclarativeSlotPool(jobGraph.getJobID());
-
-        final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .build(EXECUTOR_RESOURCE.getExecutor());
-
-        scheduler.startScheduling();
-
         final int numRequiredSlots = 1;
         final ResourceCounter requiredResources =
                 ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numRequiredSlots);
@@ -274,9 +262,10 @@ public class AdaptiveSchedulerTest {
                 ResourceCounter.withResource(
                         ResourceProfile.newBuilder().setCpuCores(1).build(), 
numRequiredSlots);
 
-        offerSlots(declarativeSlotPool, 
createSlotOffersForResourceRequirements(providedResources));
+        final Collection<TestSlotInfo> freeSlots =
+                createSlotInfosForResourceRequirements(providedResources);
 
-        assertThat(scheduler.hasDesiredResources(requiredResources)).isTrue();
+        assertThat(AdaptiveScheduler.hasDesiredResources(requiredResources, 
freeSlots)).isTrue();
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 202d0fdbd5f..536d4b4a6e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -20,12 +20,10 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.ScheduledTask;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.ManualClock;
@@ -55,9 +53,6 @@ import static org.junit.Assert.fail;
 
 /** Tests for the WaitingForResources state. */
 public class WaitingForResourcesTest extends TestLogger {
-    private static final ResourceCounter RESOURCE_COUNTER =
-            ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
-
     private static final Duration STABILIZATION_TIMEOUT = 
Duration.ofSeconds(1);
 
     /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
@@ -68,8 +63,7 @@ public class WaitingForResourcesTest extends TestLogger {
 
             ctx.setExpectCreatingExecutionGraph();
 
-            new WaitingForResources(
-                    ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             ctx.runScheduledTasks();
         }
@@ -80,8 +74,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             // we expect no state transition.
             wfr.onNewResourcesAvailable();
@@ -93,8 +86,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
             ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectCreatingExecutionGraph();
             wfr.onNewResourcesAvailable(); // .. and notify
@@ -108,11 +100,7 @@ public class WaitingForResourcesTest extends TestLogger {
             Duration noStabilizationTimeout = Duration.ofMillis(0);
             WaitingForResources wfr =
                     new WaitingForResources(
-                            ctx,
-                            log,
-                            RESOURCE_COUNTER,
-                            Duration.ofSeconds(1000),
-                            noStabilizationTimeout);
+                            ctx, log, Duration.ofSeconds(1000), 
noStabilizationTimeout);
 
             ctx.setHasDesiredResources(() -> false);
             ctx.setHasSufficientResources(() -> true);
@@ -129,11 +117,7 @@ public class WaitingForResourcesTest extends TestLogger {
 
             WaitingForResources wfr =
                     new WaitingForResources(
-                            ctx,
-                            log,
-                            RESOURCE_COUNTER,
-                            Duration.ofSeconds(1000),
-                            stabilizationTimeout);
+                            ctx, log, Duration.ofSeconds(1000), 
stabilizationTimeout);
 
             ctx.setHasDesiredResources(() -> false);
             ctx.setHasSufficientResources(() -> true);
@@ -155,7 +139,6 @@ public class WaitingForResourcesTest extends TestLogger {
                     new WaitingForResources(
                             ctx,
                             log,
-                            RESOURCE_COUNTER,
                             initialResourceTimeout,
                             stabilizationTimeout,
                             ctx.getClock(),
@@ -190,7 +173,6 @@ public class WaitingForResourcesTest extends TestLogger {
                     new WaitingForResources(
                             ctx,
                             log,
-                            RESOURCE_COUNTER,
                             initialResourceTimeout,
                             stabilizationTimeout,
                             ctx.getClock(),
@@ -232,12 +214,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx,
-                            log,
-                            RESOURCE_COUNTER,
-                            Duration.ofMillis(-1),
-                            STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ofMillis(-1), 
STABILIZATION_TIMEOUT);
 
             ctx.runScheduledTasks();
             assertThat(ctx.hasStateTransition(), is(false));
@@ -249,8 +226,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             ctx.setExpectCreatingExecutionGraph();
 
@@ -264,8 +240,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             ctx.setExpectFinished(
                     archivedExecutionGraph -> {
@@ -287,8 +262,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             ctx.setExpectFinished(
                     (archivedExecutionGraph -> {
@@ -303,8 +277,7 @@ public class WaitingForResourcesTest extends TestLogger {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasDesiredResources(() -> false);
             WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             ctx.setExpectFinished(
                     (archivedExecutionGraph -> {
@@ -480,7 +453,7 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public boolean hasDesiredResources(ResourceCounter desiredResources) {
+        public boolean hasDesiredResources() {
             return hasDesiredResourcesSupplier.get();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
index e467554ddfa..9f3573a0705 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
@@ -24,16 +24,26 @@ import 
org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 /** Test {@link SlotInfo} implementation. */
-class TestSlotInfo implements SlotInfo {
+public class TestSlotInfo implements SlotInfo {
 
     private final AllocationID allocationId;
+    private final ResourceProfile resourceProfile;
 
     public TestSlotInfo() {
-        this(new AllocationID());
+        this(new AllocationID(), ResourceProfile.ANY);
     }
 
     public TestSlotInfo(AllocationID allocationId) {
+        this(allocationId, ResourceProfile.ANY);
+    }
+
+    public TestSlotInfo(ResourceProfile resourceProfile) {
+        this(new AllocationID(), resourceProfile);
+    }
+
+    public TestSlotInfo(AllocationID allocationId, ResourceProfile 
resourceProfile) {
         this.allocationId = allocationId;
+        this.resourceProfile = resourceProfile;
     }
 
     @Override
@@ -53,7 +63,7 @@ class TestSlotInfo implements SlotInfo {
 
     @Override
     public ResourceProfile getResourceProfile() {
-        return ResourceProfile.ANY;
+        return resourceProfile;
     }
 
     @Override

Reply via email to