This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3efbae44b4a21c9735f1a3054c276c32bbc46cdd
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Wed Jun 21 16:13:50 2023 +0800

    [FLINK-31843][runtime] Introduce FreeSlotInfoTracker to manage available 
slots for SlotSelectionStrategy
---
 .../slotpool/DefaultFreeSlotInfoTracker.java       | 100 +++++++++++
 .../jobmaster/slotpool/FreeSlotInfoTracker.java    |  86 +++++++++
 .../slotpool/DefaultFreeSlotInfoTrackerTest.java   |  99 +++++++++++
 .../slotpool/FreeSlotInfoTrackerTestUtils.java     |  42 +++++
 .../slotpool/TestingFreeSlotInfoTracker.java       | 192 +++++++++++++++++++++
 5 files changed, 519 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
new file mode 100644
index 00000000000..0a394254992
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Default implements of {@link FreeSlotInfoTracker}. */
+public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker {
+    private final Set<AllocationID> freeSlots;
+    private final Function<AllocationID, SlotInfo> slotInfoLookup;
+    private final Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup;
+    private final Function<ResourceID, Double> taskExecutorUtilizationLookup;
+
+    public DefaultFreeSlotInfoTracker(
+            Set<AllocationID> freeSlots,
+            Function<AllocationID, SlotInfo> slotInfoLookup,
+            Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> 
freeSlotInfoLookup,
+            Function<ResourceID, Double> taskExecutorUtilizationLookup) {
+        this.freeSlots = new HashSet<>(freeSlots);
+        this.slotInfoLookup = slotInfoLookup;
+        this.freeSlotInfoLookup = freeSlotInfoLookup;
+        this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup;
+    }
+
+    @Override
+    public Set<AllocationID> getAvailableSlots() {
+        return Collections.unmodifiableSet(freeSlots);
+    }
+
+    @Override
+    public SlotInfo getSlotInfo(AllocationID allocationId) {
+        return Preconditions.checkNotNull(slotInfoLookup.apply(allocationId));
+    }
+
+    @Override
+    public Collection<AllocatedSlotPool.FreeSlotInfo> 
getFreeSlotsWithIdleSinceInformation() {
+        return 
freeSlots.stream().map(freeSlotInfoLookup).collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<SlotInfo> getFreeSlotsInformation() {
+        return 
freeSlots.stream().map(slotInfoLookup).collect(Collectors.toList());
+    }
+
+    @Override
+    public double getTaskExecutorUtilization(SlotInfo slotInfo) {
+        ResourceID resourceId = 
slotInfo.getTaskManagerLocation().getResourceID();
+        return taskExecutorUtilizationLookup.apply(resourceId);
+    }
+
+    @Override
+    public void reserveSlot(AllocationID allocationId) {
+        Preconditions.checkState(
+                freeSlots.remove(allocationId),
+                "Slot %s does not exist in free slots",
+                allocationId);
+    }
+
+    @Override
+    public DefaultFreeSlotInfoTracker 
createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+            Set<AllocationID> blockedSlots) {
+
+        Set<AllocationID> freeSlotInfoTrackerWithoutBlockedSlots =
+                freeSlots.stream()
+                        .filter(slot -> !blockedSlots.contains(slot))
+                        .collect(Collectors.toSet());
+
+        return new DefaultFreeSlotInfoTracker(
+                freeSlotInfoTrackerWithoutBlockedSlots,
+                slotInfoLookup,
+                freeSlotInfoLookup,
+                taskExecutorUtilizationLookup);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
new file mode 100644
index 00000000000..b685311fbd9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import java.util.Collection;
+import java.util.Set;
+
+/** Track all free slots, support bookkeeping slot for {@link 
SlotSelectionStrategy}. */
+public interface FreeSlotInfoTracker {
+
+    /**
+     * Get allocation id of all available slots.
+     *
+     * @return allocation id of available slots
+     */
+    Set<AllocationID> getAvailableSlots();
+
+    /**
+     * Get slot info by allocation id, this slot must exist.
+     *
+     * @param allocationId to get SlotInfo
+     * @return slot info for the allocation id
+     */
+    SlotInfo getSlotInfo(AllocationID allocationId);
+
+    /**
+     * Returns a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about 
all slots with slot
+     * idle since that are currently available in the slot pool.
+     *
+     * @return a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about 
all slots with slot
+     *     idle since that are currently available in the slot pool.
+     */
+    Collection<AllocatedSlotPool.FreeSlotInfo> 
getFreeSlotsWithIdleSinceInformation();
+
+    /**
+     * Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in
+     * the slot pool.
+     *
+     * @return a list of {@link SlotInfo} objects about all slots that are 
currently available in
+     *     the slot pool.
+     */
+    Collection<SlotInfo> getFreeSlotsInformation();
+
+    /**
+     * Get task executor utilization of this slot.
+     *
+     * @param slotInfo to get task executor utilization
+     * @return task executor utilization of this slot
+     */
+    double getTaskExecutorUtilization(SlotInfo slotInfo);
+
+    /**
+     * Reserve free slot when it is used.
+     *
+     * @param allocationId to reserve
+     */
+    void reserveSlot(AllocationID allocationId);
+
+    /**
+     * Create a new free slot tracker without blocked slots.
+     *
+     * @param blockedSlots slots that should not be used
+     * @return the new free slot tracker
+     */
+    FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+            Set<AllocationID> blockedSlots);
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
new file mode 100644
index 00000000000..58218e318f6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FreeSlotInfoTracker}. */
+class DefaultFreeSlotInfoTrackerTest {
+
+    @Test
+    void testReserveSlot() {
+        final ResourceID resourceId = ResourceID.generate();
+        final SlotInfo slotInfo1 = createAllocatedSlot(resourceId);
+        final SlotInfo slotInfo2 = createAllocatedSlot(resourceId);
+        final Map<AllocationID, SlotInfo> slots = new HashMap<>();
+
+        slots.put(slotInfo1.getAllocationId(), slotInfo1);
+        slots.put(slotInfo2.getAllocationId(), slotInfo2);
+
+        final FreeSlotInfoTracker freeSlotInfoTracker =
+                
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots);
+        for (AllocationID candidate : freeSlotInfoTracker.getAvailableSlots()) 
{
+            SlotInfo selectSlot = freeSlotInfoTracker.getSlotInfo(candidate);
+            
assertThat(slots.get(selectSlot.getAllocationId())).isEqualTo(selectSlot);
+            freeSlotInfoTracker.reserveSlot(selectSlot.getAllocationId());
+            break;
+        }
+
+        assertThat(freeSlotInfoTracker.getAvailableSlots())
+                .hasSize(1)
+                .containsAnyOf(slotInfo1.getAllocationId(), 
slotInfo2.getAllocationId());
+    }
+
+    @Test
+    void testCreatedFreeSlotInfoTrackerWithoutBlockedSlots() {
+        final ResourceID resourceId = ResourceID.generate();
+        final SlotInfo slotInfo1 = createAllocatedSlot(resourceId);
+        final SlotInfo slotInfo2 = createAllocatedSlot(resourceId);
+        final Map<AllocationID, SlotInfo> slots = new HashMap<>();
+
+        slots.put(slotInfo1.getAllocationId(), slotInfo1);
+        slots.put(slotInfo2.getAllocationId(), slotInfo2);
+
+        final FreeSlotInfoTracker freeSlotInfoTracker =
+                
FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots);
+        assertThat(freeSlotInfoTracker.getAvailableSlots()).hasSize(2);
+
+        final FreeSlotInfoTracker freeSlotInfoTrackerWithoutBlockedSlots =
+                
freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+                        new HashSet<>(
+                                Arrays.asList(
+                                        slotInfo1.getAllocationId(), 
slotInfo2.getAllocationId())));
+        
assertThat(freeSlotInfoTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty();
+    }
+
+    private AllocatedSlot createAllocatedSlot(ResourceID owner) {
+        return new AllocatedSlot(
+                new AllocationID(),
+                new TaskManagerLocation(owner, 
InetAddress.getLoopbackAddress(), 41),
+                0,
+                ResourceProfile.UNKNOWN,
+                new RpcTaskManagerGateway(
+                        new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(),
+                        JobMasterId.generate()));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
new file mode 100644
index 00000000000..e13835ba687
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import java.util.Map;
+
+/** Utils to create testing {@link FreeSlotInfoTracker}. */
+public class FreeSlotInfoTrackerTestUtils {
+    /**
+     * Create default free slot info tracker for provided slots.
+     *
+     * @param freeSlots slots to track
+     * @return default free slot info tracker
+     */
+    public static DefaultFreeSlotInfoTracker createDefaultFreeSlotInfoTracker(
+            Map<AllocationID, SlotInfo> freeSlots) {
+        return new DefaultFreeSlotInfoTracker(
+                freeSlots.keySet(),
+                freeSlots::get,
+                id -> new 
TestingFreeSlotInfoTracker.TestingFreeSlotInfo(freeSlots.get(id)),
+                ignored -> 0d);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
new file mode 100644
index 00000000000..af55233bb48
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Testing implements of {@link FreeSlotInfoTracker}. */
+public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker {
+    private final Supplier<Set<AllocationID>> getAvailableSlotsSupplier;
+    private final Function<AllocationID, SlotInfo> getSlotInfoFunction;
+    private final Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
+            getFreeSlotsWithIdleSinceInformationSupplier;
+    private final Supplier<Collection<SlotInfo>> 
getFreeSlotsInformationSupplier;
+    private final Function<SlotInfo, Double> 
getTaskExecutorUtilizationFunction;
+    private final Consumer<AllocationID> reserveSlotConsumer;
+    private final Function<Set<AllocationID>, FreeSlotInfoTracker>
+            createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+
+    public TestingFreeSlotInfoTracker(
+            Supplier<Set<AllocationID>> getAvailableSlotsSupplier,
+            Function<AllocationID, SlotInfo> getSlotInfoFunction,
+            Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
+                    getFreeSlotsWithIdleSinceInformationSupplier,
+            Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier,
+            Function<SlotInfo, Double> getTaskExecutorUtilizationFunction,
+            Consumer<AllocationID> reserveSlotConsumer,
+            Function<Set<AllocationID>, FreeSlotInfoTracker>
+                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
+        this.getAvailableSlotsSupplier = getAvailableSlotsSupplier;
+        this.getSlotInfoFunction = getSlotInfoFunction;
+        this.getFreeSlotsWithIdleSinceInformationSupplier =
+                getFreeSlotsWithIdleSinceInformationSupplier;
+        this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier;
+        this.getTaskExecutorUtilizationFunction = 
getTaskExecutorUtilizationFunction;
+        this.reserveSlotConsumer = reserveSlotConsumer;
+        this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
+                createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+    }
+
+    @Override
+    public Set<AllocationID> getAvailableSlots() {
+        return getAvailableSlotsSupplier.get();
+    }
+
+    @Override
+    public SlotInfo getSlotInfo(AllocationID allocationId) {
+        return getSlotInfoFunction.apply(allocationId);
+    }
+
+    @Override
+    public Collection<AllocatedSlotPool.FreeSlotInfo> 
getFreeSlotsWithIdleSinceInformation() {
+        return getFreeSlotsWithIdleSinceInformationSupplier.get();
+    }
+
+    @Override
+    public Collection<SlotInfo> getFreeSlotsInformation() {
+        return getFreeSlotsInformationSupplier.get();
+    }
+
+    @Override
+    public double getTaskExecutorUtilization(SlotInfo slotInfo) {
+        return getTaskExecutorUtilizationFunction.apply(slotInfo);
+    }
+
+    @Override
+    public void reserveSlot(AllocationID allocationId) {
+        reserveSlotConsumer.accept(allocationId);
+    }
+
+    @Override
+    public FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots(
+            Set<AllocationID> blockedSlots) {
+        return 
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction.apply(blockedSlots);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder of {@link TestingFreeSlotInfoTracker}. * */
+    public static class Builder {
+        private Supplier<Set<AllocationID>> getAvailableSlotsSupplier = 
Collections::emptySet;
+        private Function<AllocationID, SlotInfo> getSlotInfoFunction = ignored 
-> null;
+        private Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
+                getFreeSlotsWithIdleSinceInformationSupplier = 
Collections::emptyList;
+        private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier 
=
+                Collections::emptyList;
+        private Function<SlotInfo, Double> getTaskExecutorUtilizationFunction 
= ignored -> 0d;
+        private Consumer<AllocationID> reserveSlotConsumer = ignore -> {};
+        private Function<Set<AllocationID>, FreeSlotInfoTracker>
+                createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = 
ignored -> null;
+
+        public Builder setGetAvailableSlotsSupplier(
+                Supplier<Set<AllocationID>> getAvailableSlotsSupplier) {
+            this.getAvailableSlotsSupplier = getAvailableSlotsSupplier;
+            return this;
+        }
+
+        public Builder setGetSlotInfoFunction(
+                Function<AllocationID, SlotInfo> getSlotInfoFunction) {
+            this.getSlotInfoFunction = getSlotInfoFunction;
+            return this;
+        }
+
+        public Builder setGetFreeSlotsWithIdleSinceInformationSupplier(
+                Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>>
+                        getFreeSlotsWithIdleSinceInformationSupplier) {
+            this.getFreeSlotsWithIdleSinceInformationSupplier =
+                    getFreeSlotsWithIdleSinceInformationSupplier;
+            return this;
+        }
+
+        public Builder setGetFreeSlotsInformationSupplier(
+                Supplier<Collection<SlotInfo>> 
getFreeSlotsInformationSupplier) {
+            this.getFreeSlotsInformationSupplier = 
getFreeSlotsInformationSupplier;
+            return this;
+        }
+
+        public Builder setGetTaskExecutorUtilizationFunction(
+                Function<SlotInfo, Double> getTaskExecutorUtilizationFunction) 
{
+            this.getTaskExecutorUtilizationFunction = 
getTaskExecutorUtilizationFunction;
+            return this;
+        }
+
+        public Builder setReserveSlotConsumer(Consumer<AllocationID> 
reserveSlotConsumer) {
+            this.reserveSlotConsumer = reserveSlotConsumer;
+            return this;
+        }
+
+        public Builder 
setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction(
+                Function<Set<AllocationID>, FreeSlotInfoTracker>
+                        
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
+            this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
+                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+            return this;
+        }
+
+        public TestingFreeSlotInfoTracker build() {
+            return new TestingFreeSlotInfoTracker(
+                    getAvailableSlotsSupplier,
+                    getSlotInfoFunction,
+                    getFreeSlotsWithIdleSinceInformationSupplier,
+                    getFreeSlotsInformationSupplier,
+                    getTaskExecutorUtilizationFunction,
+                    reserveSlotConsumer,
+                    createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction);
+        }
+    }
+
+    /** Testing {@link AllocatedSlotPool.FreeSlotInfo}. */
+    public static class TestingFreeSlotInfo implements 
AllocatedSlotPool.FreeSlotInfo {
+        private final SlotInfo slotInfo;
+
+        public TestingFreeSlotInfo(SlotInfo slotInfo) {
+            this.slotInfo = slotInfo;
+        }
+
+        @Override
+        public SlotInfoWithUtilization asSlotInfo() {
+            return SlotInfoWithUtilization.from(slotInfo, ignore -> 0d);
+        }
+
+        @Override
+        public long getFreeSince() {
+            return 0;
+        }
+    }
+}

Reply via email to