This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3efbae44b4a21c9735f1a3054c276c32bbc46cdd Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Wed Jun 21 16:13:50 2023 +0800 [FLINK-31843][runtime] Introduce FreeSlotInfoTracker to manage available slots for SlotSelectionStrategy --- .../slotpool/DefaultFreeSlotInfoTracker.java | 100 +++++++++++ .../jobmaster/slotpool/FreeSlotInfoTracker.java | 86 +++++++++ .../slotpool/DefaultFreeSlotInfoTrackerTest.java | 99 +++++++++++ .../slotpool/FreeSlotInfoTrackerTestUtils.java | 42 +++++ .../slotpool/TestingFreeSlotInfoTracker.java | 192 +++++++++++++++++++++ 5 files changed, 519 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java new file mode 100644 index 00000000000..0a394254992 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTracker.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Default implements of {@link FreeSlotInfoTracker}. */ +public class DefaultFreeSlotInfoTracker implements FreeSlotInfoTracker { + private final Set<AllocationID> freeSlots; + private final Function<AllocationID, SlotInfo> slotInfoLookup; + private final Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> freeSlotInfoLookup; + private final Function<ResourceID, Double> taskExecutorUtilizationLookup; + + public DefaultFreeSlotInfoTracker( + Set<AllocationID> freeSlots, + Function<AllocationID, SlotInfo> slotInfoLookup, + Function<AllocationID, AllocatedSlotPool.FreeSlotInfo> freeSlotInfoLookup, + Function<ResourceID, Double> taskExecutorUtilizationLookup) { + this.freeSlots = new HashSet<>(freeSlots); + this.slotInfoLookup = slotInfoLookup; + this.freeSlotInfoLookup = freeSlotInfoLookup; + this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup; + } + + @Override + public Set<AllocationID> getAvailableSlots() { + return Collections.unmodifiableSet(freeSlots); + } + + @Override + public SlotInfo getSlotInfo(AllocationID allocationId) { + return Preconditions.checkNotNull(slotInfoLookup.apply(allocationId)); + } + + @Override + public Collection<AllocatedSlotPool.FreeSlotInfo> getFreeSlotsWithIdleSinceInformation() { + return freeSlots.stream().map(freeSlotInfoLookup).collect(Collectors.toList()); + } + + @Override + public Collection<SlotInfo> getFreeSlotsInformation() { + return freeSlots.stream().map(slotInfoLookup).collect(Collectors.toList()); + } + + @Override + public double getTaskExecutorUtilization(SlotInfo slotInfo) { + ResourceID resourceId = slotInfo.getTaskManagerLocation().getResourceID(); + return taskExecutorUtilizationLookup.apply(resourceId); + } + + @Override + public void reserveSlot(AllocationID allocationId) { + Preconditions.checkState( + freeSlots.remove(allocationId), + "Slot %s does not exist in free slots", + allocationId); + } + + @Override + public DefaultFreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( + Set<AllocationID> blockedSlots) { + + Set<AllocationID> freeSlotInfoTrackerWithoutBlockedSlots = + freeSlots.stream() + .filter(slot -> !blockedSlots.contains(slot)) + .collect(Collectors.toSet()); + + return new DefaultFreeSlotInfoTracker( + freeSlotInfoTrackerWithoutBlockedSlots, + slotInfoLookup, + freeSlotInfoLookup, + taskExecutorUtilizationLookup); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java new file mode 100644 index 00000000000..b685311fbd9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import java.util.Collection; +import java.util.Set; + +/** Track all free slots, support bookkeeping slot for {@link SlotSelectionStrategy}. */ +public interface FreeSlotInfoTracker { + + /** + * Get allocation id of all available slots. + * + * @return allocation id of available slots + */ + Set<AllocationID> getAvailableSlots(); + + /** + * Get slot info by allocation id, this slot must exist. + * + * @param allocationId to get SlotInfo + * @return slot info for the allocation id + */ + SlotInfo getSlotInfo(AllocationID allocationId); + + /** + * Returns a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about all slots with slot + * idle since that are currently available in the slot pool. + * + * @return a list of {@link AllocatedSlotPool.FreeSlotInfo} objects about all slots with slot + * idle since that are currently available in the slot pool. + */ + Collection<AllocatedSlotPool.FreeSlotInfo> getFreeSlotsWithIdleSinceInformation(); + + /** + * Returns a list of {@link SlotInfo} objects about all slots that are currently available in + * the slot pool. + * + * @return a list of {@link SlotInfo} objects about all slots that are currently available in + * the slot pool. + */ + Collection<SlotInfo> getFreeSlotsInformation(); + + /** + * Get task executor utilization of this slot. + * + * @param slotInfo to get task executor utilization + * @return task executor utilization of this slot + */ + double getTaskExecutorUtilization(SlotInfo slotInfo); + + /** + * Reserve free slot when it is used. + * + * @param allocationId to reserve + */ + void reserveSlot(AllocationID allocationId); + + /** + * Create a new free slot tracker without blocked slots. + * + * @param blockedSlots slots that should not be used + * @return the new free slot tracker + */ + FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( + Set<AllocationID> blockedSlots); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java new file mode 100644 index 00000000000..58218e318f6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultFreeSlotInfoTrackerTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FreeSlotInfoTracker}. */ +class DefaultFreeSlotInfoTrackerTest { + + @Test + void testReserveSlot() { + final ResourceID resourceId = ResourceID.generate(); + final SlotInfo slotInfo1 = createAllocatedSlot(resourceId); + final SlotInfo slotInfo2 = createAllocatedSlot(resourceId); + final Map<AllocationID, SlotInfo> slots = new HashMap<>(); + + slots.put(slotInfo1.getAllocationId(), slotInfo1); + slots.put(slotInfo2.getAllocationId(), slotInfo2); + + final FreeSlotInfoTracker freeSlotInfoTracker = + FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots); + for (AllocationID candidate : freeSlotInfoTracker.getAvailableSlots()) { + SlotInfo selectSlot = freeSlotInfoTracker.getSlotInfo(candidate); + assertThat(slots.get(selectSlot.getAllocationId())).isEqualTo(selectSlot); + freeSlotInfoTracker.reserveSlot(selectSlot.getAllocationId()); + break; + } + + assertThat(freeSlotInfoTracker.getAvailableSlots()) + .hasSize(1) + .containsAnyOf(slotInfo1.getAllocationId(), slotInfo2.getAllocationId()); + } + + @Test + void testCreatedFreeSlotInfoTrackerWithoutBlockedSlots() { + final ResourceID resourceId = ResourceID.generate(); + final SlotInfo slotInfo1 = createAllocatedSlot(resourceId); + final SlotInfo slotInfo2 = createAllocatedSlot(resourceId); + final Map<AllocationID, SlotInfo> slots = new HashMap<>(); + + slots.put(slotInfo1.getAllocationId(), slotInfo1); + slots.put(slotInfo2.getAllocationId(), slotInfo2); + + final FreeSlotInfoTracker freeSlotInfoTracker = + FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(slots); + assertThat(freeSlotInfoTracker.getAvailableSlots()).hasSize(2); + + final FreeSlotInfoTracker freeSlotInfoTrackerWithoutBlockedSlots = + freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots( + new HashSet<>( + Arrays.asList( + slotInfo1.getAllocationId(), slotInfo2.getAllocationId()))); + assertThat(freeSlotInfoTrackerWithoutBlockedSlots.getAvailableSlots()).isEmpty(); + } + + private AllocatedSlot createAllocatedSlot(ResourceID owner) { + return new AllocatedSlot( + new AllocationID(), + new TaskManagerLocation(owner, InetAddress.getLoopbackAddress(), 41), + 0, + ResourceProfile.UNKNOWN, + new RpcTaskManagerGateway( + new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), + JobMasterId.generate())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java new file mode 100644 index 00000000000..e13835ba687 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTrackerTestUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import java.util.Map; + +/** Utils to create testing {@link FreeSlotInfoTracker}. */ +public class FreeSlotInfoTrackerTestUtils { + /** + * Create default free slot info tracker for provided slots. + * + * @param freeSlots slots to track + * @return default free slot info tracker + */ + public static DefaultFreeSlotInfoTracker createDefaultFreeSlotInfoTracker( + Map<AllocationID, SlotInfo> freeSlots) { + return new DefaultFreeSlotInfoTracker( + freeSlots.keySet(), + freeSlots::get, + id -> new TestingFreeSlotInfoTracker.TestingFreeSlotInfo(freeSlots.get(id)), + ignored -> 0d); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java new file mode 100644 index 00000000000..af55233bb48 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotInfoTracker.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** Testing implements of {@link FreeSlotInfoTracker}. */ +public class TestingFreeSlotInfoTracker implements FreeSlotInfoTracker { + private final Supplier<Set<AllocationID>> getAvailableSlotsSupplier; + private final Function<AllocationID, SlotInfo> getSlotInfoFunction; + private final Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> + getFreeSlotsWithIdleSinceInformationSupplier; + private final Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier; + private final Function<SlotInfo, Double> getTaskExecutorUtilizationFunction; + private final Consumer<AllocationID> reserveSlotConsumer; + private final Function<Set<AllocationID>, FreeSlotInfoTracker> + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + + public TestingFreeSlotInfoTracker( + Supplier<Set<AllocationID>> getAvailableSlotsSupplier, + Function<AllocationID, SlotInfo> getSlotInfoFunction, + Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> + getFreeSlotsWithIdleSinceInformationSupplier, + Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier, + Function<SlotInfo, Double> getTaskExecutorUtilizationFunction, + Consumer<AllocationID> reserveSlotConsumer, + Function<Set<AllocationID>, FreeSlotInfoTracker> + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) { + this.getAvailableSlotsSupplier = getAvailableSlotsSupplier; + this.getSlotInfoFunction = getSlotInfoFunction; + this.getFreeSlotsWithIdleSinceInformationSupplier = + getFreeSlotsWithIdleSinceInformationSupplier; + this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; + this.getTaskExecutorUtilizationFunction = getTaskExecutorUtilizationFunction; + this.reserveSlotConsumer = reserveSlotConsumer; + this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + } + + @Override + public Set<AllocationID> getAvailableSlots() { + return getAvailableSlotsSupplier.get(); + } + + @Override + public SlotInfo getSlotInfo(AllocationID allocationId) { + return getSlotInfoFunction.apply(allocationId); + } + + @Override + public Collection<AllocatedSlotPool.FreeSlotInfo> getFreeSlotsWithIdleSinceInformation() { + return getFreeSlotsWithIdleSinceInformationSupplier.get(); + } + + @Override + public Collection<SlotInfo> getFreeSlotsInformation() { + return getFreeSlotsInformationSupplier.get(); + } + + @Override + public double getTaskExecutorUtilization(SlotInfo slotInfo) { + return getTaskExecutorUtilizationFunction.apply(slotInfo); + } + + @Override + public void reserveSlot(AllocationID allocationId) { + reserveSlotConsumer.accept(allocationId); + } + + @Override + public FreeSlotInfoTracker createNewFreeSlotInfoTrackerWithoutBlockedSlots( + Set<AllocationID> blockedSlots) { + return createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction.apply(blockedSlots); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder of {@link TestingFreeSlotInfoTracker}. * */ + public static class Builder { + private Supplier<Set<AllocationID>> getAvailableSlotsSupplier = Collections::emptySet; + private Function<AllocationID, SlotInfo> getSlotInfoFunction = ignored -> null; + private Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> + getFreeSlotsWithIdleSinceInformationSupplier = Collections::emptyList; + private Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier = + Collections::emptyList; + private Function<SlotInfo, Double> getTaskExecutorUtilizationFunction = ignored -> 0d; + private Consumer<AllocationID> reserveSlotConsumer = ignore -> {}; + private Function<Set<AllocationID>, FreeSlotInfoTracker> + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = ignored -> null; + + public Builder setGetAvailableSlotsSupplier( + Supplier<Set<AllocationID>> getAvailableSlotsSupplier) { + this.getAvailableSlotsSupplier = getAvailableSlotsSupplier; + return this; + } + + public Builder setGetSlotInfoFunction( + Function<AllocationID, SlotInfo> getSlotInfoFunction) { + this.getSlotInfoFunction = getSlotInfoFunction; + return this; + } + + public Builder setGetFreeSlotsWithIdleSinceInformationSupplier( + Supplier<Collection<AllocatedSlotPool.FreeSlotInfo>> + getFreeSlotsWithIdleSinceInformationSupplier) { + this.getFreeSlotsWithIdleSinceInformationSupplier = + getFreeSlotsWithIdleSinceInformationSupplier; + return this; + } + + public Builder setGetFreeSlotsInformationSupplier( + Supplier<Collection<SlotInfo>> getFreeSlotsInformationSupplier) { + this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; + return this; + } + + public Builder setGetTaskExecutorUtilizationFunction( + Function<SlotInfo, Double> getTaskExecutorUtilizationFunction) { + this.getTaskExecutorUtilizationFunction = getTaskExecutorUtilizationFunction; + return this; + } + + public Builder setReserveSlotConsumer(Consumer<AllocationID> reserveSlotConsumer) { + this.reserveSlotConsumer = reserveSlotConsumer; + return this; + } + + public Builder setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction( + Function<Set<AllocationID>, FreeSlotInfoTracker> + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) { + this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction = + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction; + return this; + } + + public TestingFreeSlotInfoTracker build() { + return new TestingFreeSlotInfoTracker( + getAvailableSlotsSupplier, + getSlotInfoFunction, + getFreeSlotsWithIdleSinceInformationSupplier, + getFreeSlotsInformationSupplier, + getTaskExecutorUtilizationFunction, + reserveSlotConsumer, + createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction); + } + } + + /** Testing {@link AllocatedSlotPool.FreeSlotInfo}. */ + public static class TestingFreeSlotInfo implements AllocatedSlotPool.FreeSlotInfo { + private final SlotInfo slotInfo; + + public TestingFreeSlotInfo(SlotInfo slotInfo) { + this.slotInfo = slotInfo; + } + + @Override + public SlotInfoWithUtilization asSlotInfo() { + return SlotInfoWithUtilization.from(slotInfo, ignore -> 0d); + } + + @Override + public long getFreeSince() { + return 0; + } + } +}