This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a24f7717847 [FLINK-34249][runtime] Remove DefaultSlotTracker related logic. a24f7717847 is described below commit a24f7717847ce4e4c511070257e99d7c3f948d2a Author: Roc Marshal <flin...@126.com> AuthorDate: Wed Jan 24 20:25:55 2024 +0800 [FLINK-34249][runtime] Remove DefaultSlotTracker related logic. --- .../slotmanager/DeclarativeTaskManagerSlot.java | 146 --------- .../slotmanager/DefaultSlotTracker.java | 337 --------------------- .../resourcemanager/slotmanager/SlotTracker.java | 112 ------- .../slotmanager/DefaultSlotTrackerTest.java | 332 -------------------- .../slotmanager/SlotStatusReconcilerTest.java | 243 --------------- 5 files changed, 1170 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java deleted file mode 100644 index 6f730d6c780..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -/** - * A DeclarativeTaskManagerSlot represents a slot located in a TaskExecutor. It contains the - * necessary information for initiating the allocation of the slot, and keeps track of the state of - * the slot. - */ -class DeclarativeTaskManagerSlot implements TaskManagerSlotInformation { - - /** The unique identification of this slot. */ - private final SlotID slotId; - - /** The resource profile of this slot. */ - private final ResourceProfile resourceProfile; - - /** Gateway to the TaskExecutor which owns the slot. */ - private final TaskExecutorConnection taskManagerConnection; - - /** Job id for which this slot has been allocated. */ - @Nullable private JobID jobId; - - private SlotState state = SlotState.FREE; - - private long allocationStartTimeStamp; - - public DeclarativeTaskManagerSlot( - SlotID slotId, - ResourceProfile resourceProfile, - TaskExecutorConnection taskManagerConnection) { - this.slotId = slotId; - this.resourceProfile = resourceProfile; - this.taskManagerConnection = taskManagerConnection; - } - - @Override - public SlotState getState() { - return state; - } - - @Override - public SlotID getSlotId() { - return slotId; - } - - @Override - public AllocationID getAllocationId() { - throw new UnsupportedOperationException(); - } - - @Override - public ResourceProfile getResourceProfile() { - return resourceProfile; - } - - @Override - public TaskExecutorConnection getTaskManagerConnection() { - return taskManagerConnection; - } - - @Nullable - @Override - public JobID getJobId() { - return jobId; - } - - @Override - public InstanceID getInstanceId() { - return taskManagerConnection.getInstanceID(); - } - - public long getAllocationStartTimestamp() { - return allocationStartTimeStamp; - } - - public void startAllocation(JobID jobId) { - Preconditions.checkState( - state == SlotState.FREE, "Slot must be free to be assigned a slot request."); - - this.jobId = jobId; - this.state = SlotState.PENDING; - this.allocationStartTimeStamp = System.currentTimeMillis(); - } - - public void completeAllocation() { - Preconditions.checkState( - state == SlotState.PENDING, - "In order to complete an allocation, the slot has to be allocated."); - - this.state = SlotState.ALLOCATED; - } - - public void freeSlot() { - Preconditions.checkState( - state == SlotState.PENDING || state == SlotState.ALLOCATED, - "Slot must be allocated or pending before freeing it."); - - this.jobId = null; - this.state = SlotState.FREE; - this.allocationStartTimeStamp = 0; - } - - @Override - public String toString() { - return "DeclarativeTaskManagerSlot{" - + "slotId=" - + slotId - + ", resourceProfile=" - + resourceProfile - + ", taskManagerConnection=" - + taskManagerConnection - + ", jobId=" - + jobId - + ", state=" - + state - + ", allocationStartTimeStamp=" - + allocationStartTimeStamp - + '}'; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java deleted file mode 100644 index 38403fdd661..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; -import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -/** Default SlotTracker implementation. */ -public class DefaultSlotTracker implements SlotTracker { - private static final Logger LOG = LoggerFactory.getLogger(DefaultSlotTracker.class); - - /** Map for all registered slots. */ - private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new HashMap<>(); - - /** Index of all currently free slots. */ - private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new LinkedHashMap<>(); - - private final MultiSlotStatusUpdateListener slotStatusUpdateListeners = - new MultiSlotStatusUpdateListener(); - - private final SlotStatusStateReconciler slotStatusStateReconciler = - new SlotStatusStateReconciler( - this::transitionSlotToFree, - this::transitionSlotToPending, - this::transitionSlotToAllocated); - - @Override - public void registerSlotStatusUpdateListener( - SlotStatusUpdateListener slotStatusUpdateListener) { - this.slotStatusUpdateListeners.registerSlotStatusUpdateListener(slotStatusUpdateListener); - } - - @Override - public void addSlot( - SlotID slotId, - ResourceProfile resourceProfile, - TaskExecutorConnection taskManagerConnection, - @Nullable JobID assignedJob) { - Preconditions.checkNotNull(slotId); - Preconditions.checkNotNull(resourceProfile); - Preconditions.checkNotNull(taskManagerConnection); - - if (slots.containsKey(slotId)) { - // remove the old slot first - LOG.debug( - "A slot was added with an already tracked slot ID {}. Removing previous entry.", - slotId); - removeSlot(slotId); - } - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); - slots.put(slotId, slot); - freeSlots.put(slotId, slot); - slotStatusStateReconciler.executeStateTransition(slot, assignedJob); - } - - @Override - public void removeSlots(Iterable<SlotID> slotsToRemove) { - Preconditions.checkNotNull(slotsToRemove); - - for (SlotID slotId : slotsToRemove) { - removeSlot(slotId); - } - } - - private void removeSlot(SlotID slotId) { - DeclarativeTaskManagerSlot slot = slots.remove(slotId); - - if (slot != null) { - if (slot.getState() != SlotState.FREE) { - transitionSlotToFree(slot); - } - freeSlots.remove(slotId); - } else { - LOG.debug("There was no slot registered with slot id {}.", slotId); - } - } - - // --------------------------------------------------------------------------------------------- - // ResourceManager slot status API - optimistically trigger transitions, but they may not - // represent true state on task executors - // --------------------------------------------------------------------------------------------- - - @Override - public void notifyFree(SlotID slotId) { - Preconditions.checkNotNull(slotId); - transitionSlotToFree(slots.get(slotId)); - } - - @Override - public void notifyAllocationStart(SlotID slotId, JobID jobId) { - Preconditions.checkNotNull(slotId); - Preconditions.checkNotNull(jobId); - transitionSlotToPending(slots.get(slotId), jobId); - } - - @Override - public void notifyAllocationComplete(SlotID slotId, JobID jobId) { - Preconditions.checkNotNull(slotId); - Preconditions.checkNotNull(jobId); - transitionSlotToAllocated(slots.get(slotId), jobId); - } - - // --------------------------------------------------------------------------------------------- - // TaskExecutor slot status API - acts as source of truth - // --------------------------------------------------------------------------------------------- - - @Override - public boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses) { - Preconditions.checkNotNull(slotStatuses); - boolean anyStatusChanged = false; - for (SlotStatus slotStatus : slotStatuses) { - anyStatusChanged |= - slotStatusStateReconciler.executeStateTransition( - slots.get(slotStatus.getSlotID()), slotStatus.getJobID()); - } - return anyStatusChanged; - } - - // --------------------------------------------------------------------------------------------- - // Core state transitions - // --------------------------------------------------------------------------------------------- - - private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) { - Preconditions.checkNotNull(slot); - Preconditions.checkState(slot.getState() != SlotState.FREE); - - // remember the slots current job and state for the notification, as this information will - // be cleared from - // the slot upon freeing - final JobID jobId = slot.getJobId(); - final SlotState state = slot.getState(); - - slot.freeSlot(); - freeSlots.put(slot.getSlotId(), slot); - slotStatusUpdateListeners.notifySlotStatusChange(slot, state, SlotState.FREE, jobId); - } - - private void transitionSlotToPending(DeclarativeTaskManagerSlot slot, JobID jobId) { - Preconditions.checkNotNull(slot); - Preconditions.checkState(slot.getState() == SlotState.FREE); - - slot.startAllocation(jobId); - freeSlots.remove(slot.getSlotId()); - slotStatusUpdateListeners.notifySlotStatusChange( - slot, SlotState.FREE, SlotState.PENDING, jobId); - } - - private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, JobID jobId) { - Preconditions.checkNotNull(slot); - Preconditions.checkState( - jobId.equals(slot.getJobId()), - "Job ID from slot status update (%s) does not match currently assigned job ID (%s) for slot %s.", - jobId, - slot.getJobId(), - slot.getSlotId()); - Preconditions.checkState( - slot.getState() == SlotState.PENDING, - "State of slot %s must be %s, but was %s.", - slot.getSlotId(), - SlotState.PENDING, - slot.getState()); - - slot.completeAllocation(); - slotStatusUpdateListeners.notifySlotStatusChange( - slot, SlotState.PENDING, SlotState.ALLOCATED, jobId); - } - - // --------------------------------------------------------------------------------------------- - // Misc - // --------------------------------------------------------------------------------------------- - - @Override - public Collection<TaskManagerSlotInformation> getFreeSlots() { - return Collections.unmodifiableCollection(freeSlots.values()); - } - - @Override - public Collection<TaskExecutorConnection> getTaskExecutorsWithAllocatedSlotsForJob( - JobID jobId) { - final Map<InstanceID, TaskExecutorConnection> taskExecutorConnections = new HashMap<>(); - for (DeclarativeTaskManagerSlot value : slots.values()) { - if (jobId.equals(value.getJobId())) { - taskExecutorConnections.put( - value.getInstanceId(), value.getTaskManagerConnection()); - } - } - return taskExecutorConnections.values(); - } - - @VisibleForTesting - boolean areMapsEmpty() { - return slots.isEmpty() && freeSlots.isEmpty(); - } - - @VisibleForTesting - @Nullable - DeclarativeTaskManagerSlot getSlot(SlotID slotId) { - return slots.get(slotId); - } - - /** - * Slot reports from task executor are the source of truth regarding the state of slots. The - * reported state may not match what is currently being tracked, and if so can contain illegal - * transitions (e.g., from free to allocated). The tracked and reported states are reconciled by - * simulating state transitions that lead us from our currently tracked state to the actual - * reported state. - * - * <p>One exception to the reported state being the source of truth are slots reported as being - * free, but tracked as being pending. This mismatch is assumed to be due to a slot allocation - * RPC not yet having been process by the task executor. This mismatch is hence ignored; it will - * be resolved eventually with the allocation either being completed or timing out. - */ - @VisibleForTesting - static class SlotStatusStateReconciler { - private final Consumer<DeclarativeTaskManagerSlot> toFreeSlot; - private final BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot; - private final BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot; - - @VisibleForTesting - SlotStatusStateReconciler( - Consumer<DeclarativeTaskManagerSlot> toFreeSlot, - BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot, - BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot) { - this.toFreeSlot = toFreeSlot; - this.toPendingSlot = toPendingSlot; - this.toAllocatedSlot = toAllocatedSlot; - } - - public boolean executeStateTransition(DeclarativeTaskManagerSlot slot, JobID jobId) { - final SlotState reportedSlotState = - jobId == null ? SlotState.FREE : SlotState.ALLOCATED; - final SlotState trackedSlotState = slot.getState(); - - if (reportedSlotState == SlotState.FREE) { - switch (trackedSlotState) { - case FREE: - // matching state - return false; - case PENDING: - // don't do anything because we expect the slot to be allocated soon - return false; - case ALLOCATED: - toFreeSlot.accept(slot); - return true; - } - } else { - switch (trackedSlotState) { - case FREE: - toPendingSlot.accept(slot, jobId); - toAllocatedSlot.accept(slot, jobId); - return true; - case PENDING: - if (!jobId.equals(slot.getJobId())) { - toFreeSlot.accept(slot); - toPendingSlot.accept(slot, jobId); - } - toAllocatedSlot.accept(slot, jobId); - return true; - case ALLOCATED: - if (!jobId.equals(slot.getJobId())) { - toFreeSlot.accept(slot); - toPendingSlot.accept(slot, jobId); - toAllocatedSlot.accept(slot, jobId); - return true; - } else { - // matching state - return false; - } - } - } - return false; - } - } - - private static class MultiSlotStatusUpdateListener implements SlotStatusUpdateListener { - - private final Collection<SlotStatusUpdateListener> listeners = new ArrayList<>(); - - public void registerSlotStatusUpdateListener( - SlotStatusUpdateListener slotStatusUpdateListener) { - listeners.add(slotStatusUpdateListener); - } - - @Override - public void notifySlotStatusChange( - TaskManagerSlotInformation slot, - SlotState previous, - SlotState current, - JobID jobId) { - LOG.trace( - "Slot {} transitioned from {} to {} for job {}.", - slot.getSlotId(), - previous, - current, - jobId); - listeners.forEach( - listeners -> listeners.notifySlotStatusChange(slot, previous, current, jobId)); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java deleted file mode 100644 index 67abe357ea9..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; -import org.apache.flink.runtime.taskexecutor.SlotStatus; - -import javax.annotation.Nullable; - -import java.util.Collection; - -/** Tracks slots and their {@link SlotState}. */ -interface SlotTracker { - - /** - * Registers the given listener with this tracker. - * - * @param slotStatusUpdateListener listener to register - */ - void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener); - - /** - * Adds the given slot to this tracker. The given slot may already be allocated for a job. This - * method must be called before the tracker is notified of any state transition or slot status - * notification. - * - * @param slotId ID of the slot - * @param resourceProfile resource of the slot - * @param taskManagerConnection connection to the hosting task executor - * @param initialJob job that the slot is allocated for, or null if it is free - */ - void addSlot( - SlotID slotId, - ResourceProfile resourceProfile, - TaskExecutorConnection taskManagerConnection, - @Nullable JobID initialJob); - - /** - * Removes the given set of slots from the slot manager. If a removed slot was not free at the - * time of removal, then this method will automatically transition the slot to a free state. - * - * @param slotsToRemove identifying the slots to remove from the slot manager - */ - void removeSlots(Iterable<SlotID> slotsToRemove); - - /** - * Notifies the tracker that the allocation for the given slot, for the given job, has started. - * - * @param slotId slot being allocated - * @param jobId job for which the slot is being allocated - */ - void notifyAllocationStart(SlotID slotId, JobID jobId); - - /** - * Notifies the tracker that the allocation for the given slot, for the given job, has completed - * successfully. - * - * @param slotId slot being allocated - * @param jobId job for which the slot is being allocated - */ - void notifyAllocationComplete(SlotID slotId, JobID jobId); - - /** - * Notifies the tracker that the given slot was freed. - * - * @param slotId slot being freed - */ - void notifyFree(SlotID slotId); - - /** - * Notifies the tracker about the slot statuses. - * - * @param slotStatuses slot statues - * @return whether any slot status has changed - */ - boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses); - - /** - * Returns a view over free slots. The returned collection cannot be modified directly, but - * reflects changes to the set of free slots. - * - * @return free slots - */ - Collection<TaskManagerSlotInformation> getFreeSlots(); - - /** - * Returns all task executors that have at least 1 pending/completed allocation for the given - * job. - * - * @param jobId the job for which the task executors must have a slot - * @return task executors with at least 1 slot for the job - */ - Collection<TaskExecutorConnection> getTaskExecutorsWithAllocatedSlotsForJob(JobID jobId); -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java deleted file mode 100644 index c72adf4e87a..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; -import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; - -import org.junit.jupiter.api.Test; - -import javax.annotation.Nullable; - -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Queue; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** Tests for the {@link DefaultSlotTracker}. */ -class DefaultSlotTrackerTest { - - private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = - new TaskExecutorConnection( - ResourceID.generate(), - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway()); - - private static final JobID jobId = new JobID(); - - @Test - public void testFreeSlotsIsEmptyOnInitially() { - SlotTracker tracker = new DefaultSlotTracker(); - - assertThat(tracker.getFreeSlots()).isEmpty(); - } - - @Test - public void testSlotAddition() { - SlotTracker tracker = new DefaultSlotTracker(); - - SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1); - - tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId)) - .containsExactlyInAnyOrder(slotId1, slotId2); - } - - @Test - public void testSlotRemoval() { - Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>(); - DefaultSlotTracker tracker = new DefaultSlotTracker(); - tracker.registerSlotStatusUpdateListener( - (slot, previous, current, jobId) -> - stateTransitions.add( - new SlotStateTransition( - slot.getSlotId(), previous, current, jobId))); - - SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1); - SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2); - - tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - tracker.notifyAllocationStart(slotId2, jobId); - tracker.notifyAllocationStart(slotId3, jobId); - tracker.notifyAllocationComplete(slotId3, jobId); - - // the transitions to this point are not relevant for this test - stateTransitions.clear(); - // we now have 1 slot in each slot state (free, pending, allocated) - // it should be possible to remove slots regardless of their state - tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3)); - - assertThat(tracker.getFreeSlots()).isEmpty(); - assertThat(tracker.areMapsEmpty()).isTrue(); - - assertThat(stateTransitions) - .containsExactlyInAnyOrder( - new SlotStateTransition(slotId2, SlotState.PENDING, SlotState.FREE, jobId), - new SlotStateTransition( - slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId)); - } - - @Test - public void testAllocationCompletion() { - Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>(); - SlotTracker tracker = new DefaultSlotTracker(); - tracker.registerSlotStatusUpdateListener( - (slot, previous, current, jobId) -> - stateTransitions.add( - new SlotStateTransition( - slot.getSlotId(), previous, current, jobId))); - - SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - - tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - tracker.notifyAllocationStart(slotId, jobId); - assertThat(tracker.getFreeSlots()).isEmpty(); - assertThat(stateTransitions.remove()) - .isEqualTo( - new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)); - - tracker.notifyAllocationComplete(slotId, jobId); - assertThat(tracker.getFreeSlots()).isEmpty(); - assertThat(stateTransitions.remove()) - .isEqualTo( - new SlotStateTransition( - slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)); - - tracker.notifyFree(slotId); - - assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId)) - .contains(slotId); - assertThat(stateTransitions.remove()) - .isEqualTo( - new SlotStateTransition( - slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)); - } - - @Test - public void testAllocationCompletionForDifferentJobThrowsIllegalStateException() { - SlotTracker tracker = new DefaultSlotTracker(); - - SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - - tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - tracker.notifyAllocationStart(slotId, new JobID()); - assertThatThrownBy(() -> tracker.notifyAllocationComplete(slotId, new JobID())) - .withFailMessage("Allocations must not be completed for a different job ID.") - .isInstanceOf(IllegalStateException.class); - } - - @Test - public void testAllocationCancellation() { - Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>(); - SlotTracker tracker = new DefaultSlotTracker(); - tracker.registerSlotStatusUpdateListener( - (slot, previous, current, jobId) -> - stateTransitions.add( - new SlotStateTransition( - slot.getSlotId(), previous, current, jobId))); - - SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - - tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - tracker.notifyAllocationStart(slotId, jobId); - assertThat(tracker.getFreeSlots()).isEmpty(); - assertThat(stateTransitions.remove()) - .isEqualTo( - new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)); - - tracker.notifyFree(slotId); - assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId)) - .contains(slotId); - assertThat(stateTransitions.remove()) - .isEqualTo( - new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)); - } - - /** - * Tests that notifications are fired before the internal state transition has been executed, to - * ensure that components reacting to the status update are in a consistent state with the - * tracker. Note that this test is not conclusive for transitions from PENDING to ALLOCATED, but - * that's okay for now because this distinction isn't exposed anywhere in the API. - */ - @Test - public void testNotificationsFiredAfterStateTransition() { - SlotID slotId = new SlotID(ResourceID.generate(), 0); - - DefaultSlotTracker tracker = new DefaultSlotTracker(); - tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - - tracker.registerSlotStatusUpdateListener( - (slot, previous, current, jobId) -> { - if (current == SlotState.FREE) { - assertThat( - tracker.getFreeSlots().stream() - .map(TaskManagerSlotInformation::getSlotId)) - .contains(slotId); - } else { - assertThat( - tracker.getFreeSlots().stream() - .map(TaskManagerSlotInformation::getSlotId)) - .doesNotContain(slotId); - } - }); - - tracker.notifyAllocationStart(slotId, jobId); - tracker.notifyAllocationComplete(slotId, jobId); - tracker.notifyFree(slotId); - } - - @Test - public void testSlotStatusProcessing() { - SlotTracker tracker = new DefaultSlotTracker(); - SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1); - SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2); - tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, jobId); - - assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId)) - .containsExactlyInAnyOrder(slotId1, slotId2); - - // move slot2 to PENDING - tracker.notifyAllocationStart(slotId2, jobId); - - final List<SlotStatus> slotReport = - Arrays.asList( - new SlotStatus(slotId1, ResourceProfile.ANY, jobId, new AllocationID()), - new SlotStatus(slotId2, ResourceProfile.ANY, null, new AllocationID()), - new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID())); - - assertThat(tracker.notifySlotStatus(slotReport)).isTrue(); - - // slot1 should now be allocated; slot2 should continue to be in a pending state; slot3 - // should be freed - assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId)) - .contains(slotId3); - - // if slot2 is not in a pending state, this will fail with an exception - tracker.notifyAllocationComplete(slotId2, jobId); - - final List<SlotStatus> idempotentSlotReport = - Arrays.asList( - new SlotStatus(slotId1, ResourceProfile.ANY, jobId, new AllocationID()), - new SlotStatus(slotId2, ResourceProfile.ANY, jobId, new AllocationID()), - new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID())); - - assertThat(tracker.notifySlotStatus(idempotentSlotReport)).isFalse(); - } - - @Test - public void testGetTaskExecutorsWithAllocatedSlotsForJob() { - final SlotTracker tracker = new DefaultSlotTracker(); - - final JobID jobId = new JobID(); - final SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0); - - assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty(); - - tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null); - assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty(); - - tracker.notifyAllocationStart(slotId, jobId); - assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId)) - .contains(TASK_EXECUTOR_CONNECTION); - - tracker.notifyAllocationComplete(slotId, jobId); - assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId)) - .contains(TASK_EXECUTOR_CONNECTION); - - tracker.notifyFree(slotId); - assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty(); - } - - private static class SlotStateTransition { - - private final SlotID slotId; - private final SlotState oldState; - private final SlotState newState; - @Nullable private final JobID jobId; - - private SlotStateTransition( - SlotID slotId, SlotState oldState, SlotState newState, @Nullable JobID jobId) { - this.slotId = slotId; - this.jobId = jobId; - this.oldState = oldState; - this.newState = newState; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SlotStateTransition that = (SlotStateTransition) o; - return Objects.equals(slotId, that.slotId) - && oldState == that.oldState - && newState == that.newState - && Objects.equals(jobId, that.jobId); - } - - @Override - public String toString() { - return "SlotStateTransition{" - + "slotId=" - + slotId - + ", oldState=" - + oldState - + ", newState=" - + newState - + ", jobId=" - + jobId - + '}'; - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java deleted file mode 100644 index e697cb1e348..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; -import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; - -import org.assertj.core.api.Condition; -import org.junit.jupiter.api.Test; - -import javax.annotation.Nullable; - -import java.util.ArrayDeque; -import java.util.Queue; - -import static org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusReconcilerTest.SlotStateTransitionMatcher.ofMatcher; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Tests for the {@link DefaultSlotTracker.SlotStatusStateReconciler}. Tests all state transitions - * that could (or should not) occur due to a slot status update. This test only checks the target - * state and job ID for state transitions, because the slot ID is not interesting and the slot state - * is not *actually* being updated. We assume the reconciler locks in a set of transitions given a - * source and target state, without worrying about the correctness of intermediate steps (because it - * shouldn't; and it would be a bit annoying to setup). - */ -class SlotStatusReconcilerTest { - - private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = - new TaskExecutorConnection( - ResourceID.generate(), - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway()); - - @Test - void testSlotStatusReconciliationForFreeSlot() { - JobID jobId1 = new JobID(); - StateTransitionTracker stateTransitionTracker = new StateTransitionTracker(); - - DefaultSlotTracker.SlotStatusStateReconciler reconciler = - createSlotStatusReconciler(stateTransitionTracker); - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot( - new SlotID(ResourceID.generate(), 0), - ResourceProfile.ANY, - TASK_EXECUTOR_CONNECTION); - - // free -> free - assertThat(reconciler.executeStateTransition(slot, null)).isFalse(); - assertThat(stateTransitionTracker.stateTransitions).isEmpty(); - - // free -> allocated - assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue(); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.PENDING, jobId1)); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1)); - } - - @Test - void testSlotStatusReconciliationForPendingSlot() { - JobID jobId1 = new JobID(); - StateTransitionTracker stateTransitionTracker = new StateTransitionTracker(); - - DefaultSlotTracker.SlotStatusStateReconciler reconciler = - createSlotStatusReconciler(stateTransitionTracker); - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot( - new SlotID(ResourceID.generate(), 0), - ResourceProfile.ANY, - TASK_EXECUTOR_CONNECTION); - slot.startAllocation(jobId1); - - // pending vs. free; should not trigger any transition because we are expecting a slot - // allocation in the future - assertThat(reconciler.executeStateTransition(slot, null)).isFalse(); - assertThat(stateTransitionTracker.stateTransitions).isEmpty(); - - // pending -> allocated - assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue(); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1)); - } - - @Test - void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() { - JobID jobId1 = new JobID(); - JobID jobId2 = new JobID(); - StateTransitionTracker stateTransitionTracker = new StateTransitionTracker(); - - DefaultSlotTracker.SlotStatusStateReconciler reconciler = - createSlotStatusReconciler(stateTransitionTracker); - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot( - new SlotID(ResourceID.generate(), 0), - ResourceProfile.ANY, - TASK_EXECUTOR_CONNECTION); - slot.startAllocation(jobId1); - - // pending(job1) -> free -> pending(job2) -> allocated(job2) - assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue(); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.FREE, jobId1)); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.PENDING, jobId2)); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2)); - } - - @Test - void testSlotStatusReconciliationForAllocatedSlot() { - JobID jobId1 = new JobID(); - StateTransitionTracker stateTransitionTracker = new StateTransitionTracker(); - - DefaultSlotTracker.SlotStatusStateReconciler reconciler = - createSlotStatusReconciler(stateTransitionTracker); - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot( - new SlotID(ResourceID.generate(), 0), - ResourceProfile.ANY, - TASK_EXECUTOR_CONNECTION); - slot.startAllocation(jobId1); - slot.completeAllocation(); - - // allocated -> allocated - assertThat(reconciler.executeStateTransition(slot, jobId1)).isFalse(); - assertThat(stateTransitionTracker.stateTransitions).isEmpty(); - - // allocated -> free - assertThat(reconciler.executeStateTransition(slot, null)).isTrue(); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.FREE, jobId1)); - } - - @Test - void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() { - JobID jobId1 = new JobID(); - JobID jobId2 = new JobID(); - StateTransitionTracker stateTransitionTracker = new StateTransitionTracker(); - - DefaultSlotTracker.SlotStatusStateReconciler reconciler = - createSlotStatusReconciler(stateTransitionTracker); - - DeclarativeTaskManagerSlot slot = - new DeclarativeTaskManagerSlot( - new SlotID(ResourceID.generate(), 0), - ResourceProfile.ANY, - TASK_EXECUTOR_CONNECTION); - slot.startAllocation(jobId1); - slot.completeAllocation(); - - // allocated(job1) -> free -> pending(job2) -> allocated(job2) - assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue(); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.FREE, jobId1)); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.PENDING, jobId2)); - assertThat(stateTransitionTracker.stateTransitions.remove()) - .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2)); - } - - private static class StateTransitionTracker { - Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>(); - - void notifyFree(DeclarativeTaskManagerSlot slot) { - stateTransitions.add(new SlotStateTransition(SlotState.FREE, slot.getJobId())); - } - - void notifyPending(JobID jobId) { - stateTransitions.add(new SlotStateTransition(SlotState.PENDING, jobId)); - } - - void notifyAllocated(JobID jobId) { - stateTransitions.add(new SlotStateTransition(SlotState.ALLOCATED, jobId)); - } - } - - private static DefaultSlotTracker.SlotStatusStateReconciler createSlotStatusReconciler( - StateTransitionTracker stateTransitionTracker) { - return new DefaultSlotTracker.SlotStatusStateReconciler( - stateTransitionTracker::notifyFree, - (jobId, jobId2) -> stateTransitionTracker.notifyPending(jobId2), - (jobId1, jobId12) -> stateTransitionTracker.notifyAllocated(jobId12)); - } - - static class SlotStateTransition { - - private final SlotState newState; - @Nullable private final JobID jobId; - - private SlotStateTransition(SlotState newState, @Nullable JobID jobId) { - this.jobId = jobId; - this.newState = newState; - } - - @Override - public String toString() { - return "SlotStateTransition{" + ", newState=" + newState + ", jobId=" + jobId + '}'; - } - } - - static class SlotStateTransitionMatcher extends Condition<SlotStateTransition> { - - private final SlotState targetState; - private final JobID jobId; - - private SlotStateTransitionMatcher(SlotState targetState, JobID jobId) { - this.targetState = targetState; - this.jobId = jobId; - } - - @Override - public boolean matches(SlotStateTransition value) { - return value.newState == targetState && jobId.equals(value.jobId); - } - - static SlotStateTransitionMatcher ofMatcher(SlotState targetState, JobID jobId) { - return new SlotStateTransitionMatcher(targetState, jobId); - } - } -}