This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a24f7717847 [FLINK-34249][runtime] Remove DefaultSlotTracker related 
logic.
a24f7717847 is described below

commit a24f7717847ce4e4c511070257e99d7c3f948d2a
Author: Roc Marshal <flin...@126.com>
AuthorDate: Wed Jan 24 20:25:55 2024 +0800

    [FLINK-34249][runtime] Remove DefaultSlotTracker related logic.
---
 .../slotmanager/DeclarativeTaskManagerSlot.java    | 146 ---------
 .../slotmanager/DefaultSlotTracker.java            | 337 ---------------------
 .../resourcemanager/slotmanager/SlotTracker.java   | 112 -------
 .../slotmanager/DefaultSlotTrackerTest.java        | 332 --------------------
 .../slotmanager/SlotStatusReconcilerTest.java      | 243 ---------------
 5 files changed, 1170 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
deleted file mode 100644
index 6f730d6c780..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-/**
- * A DeclarativeTaskManagerSlot represents a slot located in a TaskExecutor. 
It contains the
- * necessary information for initiating the allocation of the slot, and keeps 
track of the state of
- * the slot.
- */
-class DeclarativeTaskManagerSlot implements TaskManagerSlotInformation {
-
-    /** The unique identification of this slot. */
-    private final SlotID slotId;
-
-    /** The resource profile of this slot. */
-    private final ResourceProfile resourceProfile;
-
-    /** Gateway to the TaskExecutor which owns the slot. */
-    private final TaskExecutorConnection taskManagerConnection;
-
-    /** Job id for which this slot has been allocated. */
-    @Nullable private JobID jobId;
-
-    private SlotState state = SlotState.FREE;
-
-    private long allocationStartTimeStamp;
-
-    public DeclarativeTaskManagerSlot(
-            SlotID slotId,
-            ResourceProfile resourceProfile,
-            TaskExecutorConnection taskManagerConnection) {
-        this.slotId = slotId;
-        this.resourceProfile = resourceProfile;
-        this.taskManagerConnection = taskManagerConnection;
-    }
-
-    @Override
-    public SlotState getState() {
-        return state;
-    }
-
-    @Override
-    public SlotID getSlotId() {
-        return slotId;
-    }
-
-    @Override
-    public AllocationID getAllocationId() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ResourceProfile getResourceProfile() {
-        return resourceProfile;
-    }
-
-    @Override
-    public TaskExecutorConnection getTaskManagerConnection() {
-        return taskManagerConnection;
-    }
-
-    @Nullable
-    @Override
-    public JobID getJobId() {
-        return jobId;
-    }
-
-    @Override
-    public InstanceID getInstanceId() {
-        return taskManagerConnection.getInstanceID();
-    }
-
-    public long getAllocationStartTimestamp() {
-        return allocationStartTimeStamp;
-    }
-
-    public void startAllocation(JobID jobId) {
-        Preconditions.checkState(
-                state == SlotState.FREE, "Slot must be free to be assigned a 
slot request.");
-
-        this.jobId = jobId;
-        this.state = SlotState.PENDING;
-        this.allocationStartTimeStamp = System.currentTimeMillis();
-    }
-
-    public void completeAllocation() {
-        Preconditions.checkState(
-                state == SlotState.PENDING,
-                "In order to complete an allocation, the slot has to be 
allocated.");
-
-        this.state = SlotState.ALLOCATED;
-    }
-
-    public void freeSlot() {
-        Preconditions.checkState(
-                state == SlotState.PENDING || state == SlotState.ALLOCATED,
-                "Slot must be allocated or pending before freeing it.");
-
-        this.jobId = null;
-        this.state = SlotState.FREE;
-        this.allocationStartTimeStamp = 0;
-    }
-
-    @Override
-    public String toString() {
-        return "DeclarativeTaskManagerSlot{"
-                + "slotId="
-                + slotId
-                + ", resourceProfile="
-                + resourceProfile
-                + ", taskManagerConnection="
-                + taskManagerConnection
-                + ", jobId="
-                + jobId
-                + ", state="
-                + state
-                + ", allocationStartTimeStamp="
-                + allocationStartTimeStamp
-                + '}';
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
deleted file mode 100644
index 38403fdd661..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-/** Default SlotTracker implementation. */
-public class DefaultSlotTracker implements SlotTracker {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSlotTracker.class);
-
-    /** Map for all registered slots. */
-    private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new 
HashMap<>();
-
-    /** Index of all currently free slots. */
-    private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new 
LinkedHashMap<>();
-
-    private final MultiSlotStatusUpdateListener slotStatusUpdateListeners =
-            new MultiSlotStatusUpdateListener();
-
-    private final SlotStatusStateReconciler slotStatusStateReconciler =
-            new SlotStatusStateReconciler(
-                    this::transitionSlotToFree,
-                    this::transitionSlotToPending,
-                    this::transitionSlotToAllocated);
-
-    @Override
-    public void registerSlotStatusUpdateListener(
-            SlotStatusUpdateListener slotStatusUpdateListener) {
-        
this.slotStatusUpdateListeners.registerSlotStatusUpdateListener(slotStatusUpdateListener);
-    }
-
-    @Override
-    public void addSlot(
-            SlotID slotId,
-            ResourceProfile resourceProfile,
-            TaskExecutorConnection taskManagerConnection,
-            @Nullable JobID assignedJob) {
-        Preconditions.checkNotNull(slotId);
-        Preconditions.checkNotNull(resourceProfile);
-        Preconditions.checkNotNull(taskManagerConnection);
-
-        if (slots.containsKey(slotId)) {
-            // remove the old slot first
-            LOG.debug(
-                    "A slot was added with an already tracked slot ID {}. 
Removing previous entry.",
-                    slotId);
-            removeSlot(slotId);
-        }
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(slotId, resourceProfile, 
taskManagerConnection);
-        slots.put(slotId, slot);
-        freeSlots.put(slotId, slot);
-        slotStatusStateReconciler.executeStateTransition(slot, assignedJob);
-    }
-
-    @Override
-    public void removeSlots(Iterable<SlotID> slotsToRemove) {
-        Preconditions.checkNotNull(slotsToRemove);
-
-        for (SlotID slotId : slotsToRemove) {
-            removeSlot(slotId);
-        }
-    }
-
-    private void removeSlot(SlotID slotId) {
-        DeclarativeTaskManagerSlot slot = slots.remove(slotId);
-
-        if (slot != null) {
-            if (slot.getState() != SlotState.FREE) {
-                transitionSlotToFree(slot);
-            }
-            freeSlots.remove(slotId);
-        } else {
-            LOG.debug("There was no slot registered with slot id {}.", slotId);
-        }
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // ResourceManager slot status API - optimistically trigger transitions, 
but they may not
-    // represent true state on task executors
-    // 
---------------------------------------------------------------------------------------------
-
-    @Override
-    public void notifyFree(SlotID slotId) {
-        Preconditions.checkNotNull(slotId);
-        transitionSlotToFree(slots.get(slotId));
-    }
-
-    @Override
-    public void notifyAllocationStart(SlotID slotId, JobID jobId) {
-        Preconditions.checkNotNull(slotId);
-        Preconditions.checkNotNull(jobId);
-        transitionSlotToPending(slots.get(slotId), jobId);
-    }
-
-    @Override
-    public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
-        Preconditions.checkNotNull(slotId);
-        Preconditions.checkNotNull(jobId);
-        transitionSlotToAllocated(slots.get(slotId), jobId);
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // TaskExecutor slot status API - acts as source of truth
-    // 
---------------------------------------------------------------------------------------------
-
-    @Override
-    public boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
-        Preconditions.checkNotNull(slotStatuses);
-        boolean anyStatusChanged = false;
-        for (SlotStatus slotStatus : slotStatuses) {
-            anyStatusChanged |=
-                    slotStatusStateReconciler.executeStateTransition(
-                            slots.get(slotStatus.getSlotID()), 
slotStatus.getJobID());
-        }
-        return anyStatusChanged;
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // Core state transitions
-    // 
---------------------------------------------------------------------------------------------
-
-    private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
-        Preconditions.checkNotNull(slot);
-        Preconditions.checkState(slot.getState() != SlotState.FREE);
-
-        // remember the slots current job and state for the notification, as 
this information will
-        // be cleared from
-        // the slot upon freeing
-        final JobID jobId = slot.getJobId();
-        final SlotState state = slot.getState();
-
-        slot.freeSlot();
-        freeSlots.put(slot.getSlotId(), slot);
-        slotStatusUpdateListeners.notifySlotStatusChange(slot, state, 
SlotState.FREE, jobId);
-    }
-
-    private void transitionSlotToPending(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
-        Preconditions.checkNotNull(slot);
-        Preconditions.checkState(slot.getState() == SlotState.FREE);
-
-        slot.startAllocation(jobId);
-        freeSlots.remove(slot.getSlotId());
-        slotStatusUpdateListeners.notifySlotStatusChange(
-                slot, SlotState.FREE, SlotState.PENDING, jobId);
-    }
-
-    private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
-        Preconditions.checkNotNull(slot);
-        Preconditions.checkState(
-                jobId.equals(slot.getJobId()),
-                "Job ID from slot status update (%s) does not match currently 
assigned job ID (%s) for slot %s.",
-                jobId,
-                slot.getJobId(),
-                slot.getSlotId());
-        Preconditions.checkState(
-                slot.getState() == SlotState.PENDING,
-                "State of slot %s must be %s, but was %s.",
-                slot.getSlotId(),
-                SlotState.PENDING,
-                slot.getState());
-
-        slot.completeAllocation();
-        slotStatusUpdateListeners.notifySlotStatusChange(
-                slot, SlotState.PENDING, SlotState.ALLOCATED, jobId);
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-    // Misc
-    // 
---------------------------------------------------------------------------------------------
-
-    @Override
-    public Collection<TaskManagerSlotInformation> getFreeSlots() {
-        return Collections.unmodifiableCollection(freeSlots.values());
-    }
-
-    @Override
-    public Collection<TaskExecutorConnection> 
getTaskExecutorsWithAllocatedSlotsForJob(
-            JobID jobId) {
-        final Map<InstanceID, TaskExecutorConnection> taskExecutorConnections 
= new HashMap<>();
-        for (DeclarativeTaskManagerSlot value : slots.values()) {
-            if (jobId.equals(value.getJobId())) {
-                taskExecutorConnections.put(
-                        value.getInstanceId(), 
value.getTaskManagerConnection());
-            }
-        }
-        return taskExecutorConnections.values();
-    }
-
-    @VisibleForTesting
-    boolean areMapsEmpty() {
-        return slots.isEmpty() && freeSlots.isEmpty();
-    }
-
-    @VisibleForTesting
-    @Nullable
-    DeclarativeTaskManagerSlot getSlot(SlotID slotId) {
-        return slots.get(slotId);
-    }
-
-    /**
-     * Slot reports from task executor are the source of truth regarding the 
state of slots. The
-     * reported state may not match what is currently being tracked, and if so 
can contain illegal
-     * transitions (e.g., from free to allocated). The tracked and reported 
states are reconciled by
-     * simulating state transitions that lead us from our currently tracked 
state to the actual
-     * reported state.
-     *
-     * <p>One exception to the reported state being the source of truth are 
slots reported as being
-     * free, but tracked as being pending. This mismatch is assumed to be due 
to a slot allocation
-     * RPC not yet having been process by the task executor. This mismatch is 
hence ignored; it will
-     * be resolved eventually with the allocation either being completed or 
timing out.
-     */
-    @VisibleForTesting
-    static class SlotStatusStateReconciler {
-        private final Consumer<DeclarativeTaskManagerSlot> toFreeSlot;
-        private final BiConsumer<DeclarativeTaskManagerSlot, JobID> 
toPendingSlot;
-        private final BiConsumer<DeclarativeTaskManagerSlot, JobID> 
toAllocatedSlot;
-
-        @VisibleForTesting
-        SlotStatusStateReconciler(
-                Consumer<DeclarativeTaskManagerSlot> toFreeSlot,
-                BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot,
-                BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot) 
{
-            this.toFreeSlot = toFreeSlot;
-            this.toPendingSlot = toPendingSlot;
-            this.toAllocatedSlot = toAllocatedSlot;
-        }
-
-        public boolean executeStateTransition(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
-            final SlotState reportedSlotState =
-                    jobId == null ? SlotState.FREE : SlotState.ALLOCATED;
-            final SlotState trackedSlotState = slot.getState();
-
-            if (reportedSlotState == SlotState.FREE) {
-                switch (trackedSlotState) {
-                    case FREE:
-                        // matching state
-                        return false;
-                    case PENDING:
-                        // don't do anything because we expect the slot to be 
allocated soon
-                        return false;
-                    case ALLOCATED:
-                        toFreeSlot.accept(slot);
-                        return true;
-                }
-            } else {
-                switch (trackedSlotState) {
-                    case FREE:
-                        toPendingSlot.accept(slot, jobId);
-                        toAllocatedSlot.accept(slot, jobId);
-                        return true;
-                    case PENDING:
-                        if (!jobId.equals(slot.getJobId())) {
-                            toFreeSlot.accept(slot);
-                            toPendingSlot.accept(slot, jobId);
-                        }
-                        toAllocatedSlot.accept(slot, jobId);
-                        return true;
-                    case ALLOCATED:
-                        if (!jobId.equals(slot.getJobId())) {
-                            toFreeSlot.accept(slot);
-                            toPendingSlot.accept(slot, jobId);
-                            toAllocatedSlot.accept(slot, jobId);
-                            return true;
-                        } else {
-                            // matching state
-                            return false;
-                        }
-                }
-            }
-            return false;
-        }
-    }
-
-    private static class MultiSlotStatusUpdateListener implements 
SlotStatusUpdateListener {
-
-        private final Collection<SlotStatusUpdateListener> listeners = new 
ArrayList<>();
-
-        public void registerSlotStatusUpdateListener(
-                SlotStatusUpdateListener slotStatusUpdateListener) {
-            listeners.add(slotStatusUpdateListener);
-        }
-
-        @Override
-        public void notifySlotStatusChange(
-                TaskManagerSlotInformation slot,
-                SlotState previous,
-                SlotState current,
-                JobID jobId) {
-            LOG.trace(
-                    "Slot {} transitioned from {} to {} for job {}.",
-                    slot.getSlotId(),
-                    previous,
-                    current,
-                    jobId);
-            listeners.forEach(
-                    listeners -> listeners.notifySlotStatusChange(slot, 
previous, current, jobId));
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
deleted file mode 100644
index 67abe357ea9..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-
-/** Tracks slots and their {@link SlotState}. */
-interface SlotTracker {
-
-    /**
-     * Registers the given listener with this tracker.
-     *
-     * @param slotStatusUpdateListener listener to register
-     */
-    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
-
-    /**
-     * Adds the given slot to this tracker. The given slot may already be 
allocated for a job. This
-     * method must be called before the tracker is notified of any state 
transition or slot status
-     * notification.
-     *
-     * @param slotId ID of the slot
-     * @param resourceProfile resource of the slot
-     * @param taskManagerConnection connection to the hosting task executor
-     * @param initialJob job that the slot is allocated for, or null if it is 
free
-     */
-    void addSlot(
-            SlotID slotId,
-            ResourceProfile resourceProfile,
-            TaskExecutorConnection taskManagerConnection,
-            @Nullable JobID initialJob);
-
-    /**
-     * Removes the given set of slots from the slot manager. If a removed slot 
was not free at the
-     * time of removal, then this method will automatically transition the 
slot to a free state.
-     *
-     * @param slotsToRemove identifying the slots to remove from the slot 
manager
-     */
-    void removeSlots(Iterable<SlotID> slotsToRemove);
-
-    /**
-     * Notifies the tracker that the allocation for the given slot, for the 
given job, has started.
-     *
-     * @param slotId slot being allocated
-     * @param jobId job for which the slot is being allocated
-     */
-    void notifyAllocationStart(SlotID slotId, JobID jobId);
-
-    /**
-     * Notifies the tracker that the allocation for the given slot, for the 
given job, has completed
-     * successfully.
-     *
-     * @param slotId slot being allocated
-     * @param jobId job for which the slot is being allocated
-     */
-    void notifyAllocationComplete(SlotID slotId, JobID jobId);
-
-    /**
-     * Notifies the tracker that the given slot was freed.
-     *
-     * @param slotId slot being freed
-     */
-    void notifyFree(SlotID slotId);
-
-    /**
-     * Notifies the tracker about the slot statuses.
-     *
-     * @param slotStatuses slot statues
-     * @return whether any slot status has changed
-     */
-    boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses);
-
-    /**
-     * Returns a view over free slots. The returned collection cannot be 
modified directly, but
-     * reflects changes to the set of free slots.
-     *
-     * @return free slots
-     */
-    Collection<TaskManagerSlotInformation> getFreeSlots();
-
-    /**
-     * Returns all task executors that have at least 1 pending/completed 
allocation for the given
-     * job.
-     *
-     * @param jobId the job for which the task executors must have a slot
-     * @return task executors with at least 1 slot for the job
-     */
-    Collection<TaskExecutorConnection> 
getTaskExecutorsWithAllocatedSlotsForJob(JobID jobId);
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
deleted file mode 100644
index c72adf4e87a..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for the {@link DefaultSlotTracker}. */
-class DefaultSlotTrackerTest {
-
-    private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
-            new TaskExecutorConnection(
-                    ResourceID.generate(),
-                    new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
-    private static final JobID jobId = new JobID();
-
-    @Test
-    public void testFreeSlotsIsEmptyOnInitially() {
-        SlotTracker tracker = new DefaultSlotTracker();
-
-        assertThat(tracker.getFreeSlots()).isEmpty();
-    }
-
-    @Test
-    public void testSlotAddition() {
-        SlotTracker tracker = new DefaultSlotTracker();
-
-        SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-        SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
1);
-
-        tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-        tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-
-        
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
-                .containsExactlyInAnyOrder(slotId1, slotId2);
-    }
-
-    @Test
-    public void testSlotRemoval() {
-        Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
-        DefaultSlotTracker tracker = new DefaultSlotTracker();
-        tracker.registerSlotStatusUpdateListener(
-                (slot, previous, current, jobId) ->
-                        stateTransitions.add(
-                                new SlotStateTransition(
-                                        slot.getSlotId(), previous, current, 
jobId)));
-
-        SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-        SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
1);
-        SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
2);
-
-        tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-        tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-        tracker.addSlot(slotId3, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-
-        tracker.notifyAllocationStart(slotId2, jobId);
-        tracker.notifyAllocationStart(slotId3, jobId);
-        tracker.notifyAllocationComplete(slotId3, jobId);
-
-        // the transitions to this point are not relevant for this test
-        stateTransitions.clear();
-        // we now have 1 slot in each slot state (free, pending, allocated)
-        // it should be possible to remove slots regardless of their state
-        tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
-
-        assertThat(tracker.getFreeSlots()).isEmpty();
-        assertThat(tracker.areMapsEmpty()).isTrue();
-
-        assertThat(stateTransitions)
-                .containsExactlyInAnyOrder(
-                        new SlotStateTransition(slotId2, SlotState.PENDING, 
SlotState.FREE, jobId),
-                        new SlotStateTransition(
-                                slotId3, SlotState.ALLOCATED, SlotState.FREE, 
jobId));
-    }
-
-    @Test
-    public void testAllocationCompletion() {
-        Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
-        SlotTracker tracker = new DefaultSlotTracker();
-        tracker.registerSlotStatusUpdateListener(
-                (slot, previous, current, jobId) ->
-                        stateTransitions.add(
-                                new SlotStateTransition(
-                                        slot.getSlotId(), previous, current, 
jobId)));
-
-        SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-
-        tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, 
null);
-
-        tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(tracker.getFreeSlots()).isEmpty();
-        assertThat(stateTransitions.remove())
-                .isEqualTo(
-                        new SlotStateTransition(slotId, SlotState.FREE, 
SlotState.PENDING, jobId));
-
-        tracker.notifyAllocationComplete(slotId, jobId);
-        assertThat(tracker.getFreeSlots()).isEmpty();
-        assertThat(stateTransitions.remove())
-                .isEqualTo(
-                        new SlotStateTransition(
-                                slotId, SlotState.PENDING, 
SlotState.ALLOCATED, jobId));
-
-        tracker.notifyFree(slotId);
-
-        
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
-                .contains(slotId);
-        assertThat(stateTransitions.remove())
-                .isEqualTo(
-                        new SlotStateTransition(
-                                slotId, SlotState.ALLOCATED, SlotState.FREE, 
jobId));
-    }
-
-    @Test
-    public void 
testAllocationCompletionForDifferentJobThrowsIllegalStateException() {
-        SlotTracker tracker = new DefaultSlotTracker();
-
-        SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-
-        tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, 
null);
-
-        tracker.notifyAllocationStart(slotId, new JobID());
-        assertThatThrownBy(() -> tracker.notifyAllocationComplete(slotId, new 
JobID()))
-                .withFailMessage("Allocations must not be completed for a 
different job ID.")
-                .isInstanceOf(IllegalStateException.class);
-    }
-
-    @Test
-    public void testAllocationCancellation() {
-        Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
-        SlotTracker tracker = new DefaultSlotTracker();
-        tracker.registerSlotStatusUpdateListener(
-                (slot, previous, current, jobId) ->
-                        stateTransitions.add(
-                                new SlotStateTransition(
-                                        slot.getSlotId(), previous, current, 
jobId)));
-
-        SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-
-        tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, 
null);
-
-        tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(tracker.getFreeSlots()).isEmpty();
-        assertThat(stateTransitions.remove())
-                .isEqualTo(
-                        new SlotStateTransition(slotId, SlotState.FREE, 
SlotState.PENDING, jobId));
-
-        tracker.notifyFree(slotId);
-        
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
-                .contains(slotId);
-        assertThat(stateTransitions.remove())
-                .isEqualTo(
-                        new SlotStateTransition(slotId, SlotState.PENDING, 
SlotState.FREE, jobId));
-    }
-
-    /**
-     * Tests that notifications are fired before the internal state transition 
has been executed, to
-     * ensure that components reacting to the status update are in a 
consistent state with the
-     * tracker. Note that this test is not conclusive for transitions from 
PENDING to ALLOCATED, but
-     * that's okay for now because this distinction isn't exposed anywhere in 
the API.
-     */
-    @Test
-    public void testNotificationsFiredAfterStateTransition() {
-        SlotID slotId = new SlotID(ResourceID.generate(), 0);
-
-        DefaultSlotTracker tracker = new DefaultSlotTracker();
-        tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, 
null);
-
-        tracker.registerSlotStatusUpdateListener(
-                (slot, previous, current, jobId) -> {
-                    if (current == SlotState.FREE) {
-                        assertThat(
-                                        tracker.getFreeSlots().stream()
-                                                
.map(TaskManagerSlotInformation::getSlotId))
-                                .contains(slotId);
-                    } else {
-                        assertThat(
-                                        tracker.getFreeSlots().stream()
-                                                
.map(TaskManagerSlotInformation::getSlotId))
-                                .doesNotContain(slotId);
-                    }
-                });
-
-        tracker.notifyAllocationStart(slotId, jobId);
-        tracker.notifyAllocationComplete(slotId, jobId);
-        tracker.notifyFree(slotId);
-    }
-
-    @Test
-    public void testSlotStatusProcessing() {
-        SlotTracker tracker = new DefaultSlotTracker();
-        SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
0);
-        SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
1);
-        SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 
2);
-        tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-        tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
-        tracker.addSlot(slotId3, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, jobId);
-
-        
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
-                .containsExactlyInAnyOrder(slotId1, slotId2);
-
-        // move slot2 to PENDING
-        tracker.notifyAllocationStart(slotId2, jobId);
-
-        final List<SlotStatus> slotReport =
-                Arrays.asList(
-                        new SlotStatus(slotId1, ResourceProfile.ANY, jobId, 
new AllocationID()),
-                        new SlotStatus(slotId2, ResourceProfile.ANY, null, new 
AllocationID()),
-                        new SlotStatus(slotId3, ResourceProfile.ANY, null, new 
AllocationID()));
-
-        assertThat(tracker.notifySlotStatus(slotReport)).isTrue();
-
-        // slot1 should now be allocated; slot2 should continue to be in a 
pending state; slot3
-        // should be freed
-        
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
-                .contains(slotId3);
-
-        // if slot2 is not in a pending state, this will fail with an exception
-        tracker.notifyAllocationComplete(slotId2, jobId);
-
-        final List<SlotStatus> idempotentSlotReport =
-                Arrays.asList(
-                        new SlotStatus(slotId1, ResourceProfile.ANY, jobId, 
new AllocationID()),
-                        new SlotStatus(slotId2, ResourceProfile.ANY, jobId, 
new AllocationID()),
-                        new SlotStatus(slotId3, ResourceProfile.ANY, null, new 
AllocationID()));
-
-        assertThat(tracker.notifySlotStatus(idempotentSlotReport)).isFalse();
-    }
-
-    @Test
-    public void testGetTaskExecutorsWithAllocatedSlotsForJob() {
-        final SlotTracker tracker = new DefaultSlotTracker();
-
-        final JobID jobId = new JobID();
-        final SlotID slotId = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
-
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new 
JobID())).isEmpty();
-
-        tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, 
null);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new 
JobID())).isEmpty();
-
-        tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
-                .contains(TASK_EXECUTOR_CONNECTION);
-
-        tracker.notifyAllocationComplete(slotId, jobId);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
-                .contains(TASK_EXECUTOR_CONNECTION);
-
-        tracker.notifyFree(slotId);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new 
JobID())).isEmpty();
-    }
-
-    private static class SlotStateTransition {
-
-        private final SlotID slotId;
-        private final SlotState oldState;
-        private final SlotState newState;
-        @Nullable private final JobID jobId;
-
-        private SlotStateTransition(
-                SlotID slotId, SlotState oldState, SlotState newState, 
@Nullable JobID jobId) {
-            this.slotId = slotId;
-            this.jobId = jobId;
-            this.oldState = oldState;
-            this.newState = newState;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            SlotStateTransition that = (SlotStateTransition) o;
-            return Objects.equals(slotId, that.slotId)
-                    && oldState == that.oldState
-                    && newState == that.newState
-                    && Objects.equals(jobId, that.jobId);
-        }
-
-        @Override
-        public String toString() {
-            return "SlotStateTransition{"
-                    + "slotId="
-                    + slotId
-                    + ", oldState="
-                    + oldState
-                    + ", newState="
-                    + newState
-                    + ", jobId="
-                    + jobId
-                    + '}';
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
deleted file mode 100644
index e697cb1e348..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import org.assertj.core.api.Condition;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import static 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusReconcilerTest.SlotStateTransitionMatcher.ofMatcher;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Tests for the {@link DefaultSlotTracker.SlotStatusStateReconciler}. Tests 
all state transitions
- * that could (or should not) occur due to a slot status update. This test 
only checks the target
- * state and job ID for state transitions, because the slot ID is not 
interesting and the slot state
- * is not *actually* being updated. We assume the reconciler locks in a set of 
transitions given a
- * source and target state, without worrying about the correctness of 
intermediate steps (because it
- * shouldn't; and it would be a bit annoying to setup).
- */
-class SlotStatusReconcilerTest {
-
-    private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
-            new TaskExecutorConnection(
-                    ResourceID.generate(),
-                    new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
-    @Test
-    void testSlotStatusReconciliationForFreeSlot() {
-        JobID jobId1 = new JobID();
-        StateTransitionTracker stateTransitionTracker = new 
StateTransitionTracker();
-
-        DefaultSlotTracker.SlotStatusStateReconciler reconciler =
-                createSlotStatusReconciler(stateTransitionTracker);
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(
-                        new SlotID(ResourceID.generate(), 0),
-                        ResourceProfile.ANY,
-                        TASK_EXECUTOR_CONNECTION);
-
-        // free -> free
-        assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
-        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
-        // free -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.PENDING, jobId1));
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
-    }
-
-    @Test
-    void testSlotStatusReconciliationForPendingSlot() {
-        JobID jobId1 = new JobID();
-        StateTransitionTracker stateTransitionTracker = new 
StateTransitionTracker();
-
-        DefaultSlotTracker.SlotStatusStateReconciler reconciler =
-                createSlotStatusReconciler(stateTransitionTracker);
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(
-                        new SlotID(ResourceID.generate(), 0),
-                        ResourceProfile.ANY,
-                        TASK_EXECUTOR_CONNECTION);
-        slot.startAllocation(jobId1);
-
-        // pending vs. free; should not trigger any transition because we are 
expecting a slot
-        // allocation in the future
-        assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
-        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
-        // pending -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
-    }
-
-    @Test
-    void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
-        JobID jobId1 = new JobID();
-        JobID jobId2 = new JobID();
-        StateTransitionTracker stateTransitionTracker = new 
StateTransitionTracker();
-
-        DefaultSlotTracker.SlotStatusStateReconciler reconciler =
-                createSlotStatusReconciler(stateTransitionTracker);
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(
-                        new SlotID(ResourceID.generate(), 0),
-                        ResourceProfile.ANY,
-                        TASK_EXECUTOR_CONNECTION);
-        slot.startAllocation(jobId1);
-
-        // pending(job1) -> free -> pending(job2) -> allocated(job2)
-        assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.FREE, jobId1));
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.PENDING, jobId2));
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
-    }
-
-    @Test
-    void testSlotStatusReconciliationForAllocatedSlot() {
-        JobID jobId1 = new JobID();
-        StateTransitionTracker stateTransitionTracker = new 
StateTransitionTracker();
-
-        DefaultSlotTracker.SlotStatusStateReconciler reconciler =
-                createSlotStatusReconciler(stateTransitionTracker);
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(
-                        new SlotID(ResourceID.generate(), 0),
-                        ResourceProfile.ANY,
-                        TASK_EXECUTOR_CONNECTION);
-        slot.startAllocation(jobId1);
-        slot.completeAllocation();
-
-        // allocated -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1)).isFalse();
-        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
-        // allocated -> free
-        assertThat(reconciler.executeStateTransition(slot, null)).isTrue();
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.FREE, jobId1));
-    }
-
-    @Test
-    void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
-        JobID jobId1 = new JobID();
-        JobID jobId2 = new JobID();
-        StateTransitionTracker stateTransitionTracker = new 
StateTransitionTracker();
-
-        DefaultSlotTracker.SlotStatusStateReconciler reconciler =
-                createSlotStatusReconciler(stateTransitionTracker);
-
-        DeclarativeTaskManagerSlot slot =
-                new DeclarativeTaskManagerSlot(
-                        new SlotID(ResourceID.generate(), 0),
-                        ResourceProfile.ANY,
-                        TASK_EXECUTOR_CONNECTION);
-        slot.startAllocation(jobId1);
-        slot.completeAllocation();
-
-        // allocated(job1) -> free -> pending(job2) -> allocated(job2)
-        assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.FREE, jobId1));
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.PENDING, jobId2));
-        assertThat(stateTransitionTracker.stateTransitions.remove())
-                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
-    }
-
-    private static class StateTransitionTracker {
-        Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
-
-        void notifyFree(DeclarativeTaskManagerSlot slot) {
-            stateTransitions.add(new SlotStateTransition(SlotState.FREE, 
slot.getJobId()));
-        }
-
-        void notifyPending(JobID jobId) {
-            stateTransitions.add(new SlotStateTransition(SlotState.PENDING, 
jobId));
-        }
-
-        void notifyAllocated(JobID jobId) {
-            stateTransitions.add(new SlotStateTransition(SlotState.ALLOCATED, 
jobId));
-        }
-    }
-
-    private static DefaultSlotTracker.SlotStatusStateReconciler 
createSlotStatusReconciler(
-            StateTransitionTracker stateTransitionTracker) {
-        return new DefaultSlotTracker.SlotStatusStateReconciler(
-                stateTransitionTracker::notifyFree,
-                (jobId, jobId2) -> 
stateTransitionTracker.notifyPending(jobId2),
-                (jobId1, jobId12) -> 
stateTransitionTracker.notifyAllocated(jobId12));
-    }
-
-    static class SlotStateTransition {
-
-        private final SlotState newState;
-        @Nullable private final JobID jobId;
-
-        private SlotStateTransition(SlotState newState, @Nullable JobID jobId) 
{
-            this.jobId = jobId;
-            this.newState = newState;
-        }
-
-        @Override
-        public String toString() {
-            return "SlotStateTransition{" + ", newState=" + newState + ", 
jobId=" + jobId + '}';
-        }
-    }
-
-    static class SlotStateTransitionMatcher extends 
Condition<SlotStateTransition> {
-
-        private final SlotState targetState;
-        private final JobID jobId;
-
-        private SlotStateTransitionMatcher(SlotState targetState, JobID jobId) 
{
-            this.targetState = targetState;
-            this.jobId = jobId;
-        }
-
-        @Override
-        public boolean matches(SlotStateTransition value) {
-            return value.newState == targetState && jobId.equals(value.jobId);
-        }
-
-        static SlotStateTransitionMatcher ofMatcher(SlotState targetState, 
JobID jobId) {
-            return new SlotStateTransitionMatcher(targetState, jobId);
-        }
-    }
-}

Reply via email to