Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5403#discussion_r170541794
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * A slot profile describes the profile of a slot into which a task wants
to be scheduled. The profile contains
+ * attributes such as resource or locality constraints, some of which may
be hard or soft. A matcher can be generated
+ * to filter out candidate slots by matching their {@link SlotContext}
against the slot profile and, potentially,
+ * further requirements.
+ */
+public class SlotProfile {
+
+ /** Singleton object for a slot profile without any requirements. */
+ private static final SlotProfile NO_REQUIREMENTS =
noLocality(ResourceProfile.UNKNOWN);
+
+ /** This specifies the desired resource profile for the slot. */
+ @Nonnull
+ private final ResourceProfile resourceProfile;
+
+ /** This specifies the preferred locations for the slot. */
+ @Nonnull
+ private final Collection<TaskManagerLocation> preferredLocations;
+
+ /** This contains desired allocation ids of the slot. */
+ @Nonnull
+ private final Collection<AllocationID> priorAllocations;
+
+ public SlotProfile(
+ @Nonnull ResourceProfile resourceProfile,
+ @Nonnull Collection<TaskManagerLocation> preferredLocations,
+ @Nonnull Collection<AllocationID> priorAllocations) {
+ this.resourceProfile = resourceProfile;
+ this.preferredLocations = preferredLocations;
+ this.priorAllocations = priorAllocations;
+ }
+
+ /**
+ * Returns the desired resource profile for the slot.
+ */
+ @Nonnull
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ /**
+ * Returns the preferred locations for the slot.
+ */
+ @Nonnull
+ public Collection<TaskManagerLocation> getPreferredLocations() {
+ return preferredLocations;
+ }
+
+ /**
+ * Returns the desired allocation ids for the slot.
+ */
+ @Nonnull
+ public Collection<AllocationID> getPriorAllocations() {
+ return priorAllocations;
+ }
+
+ public ProfileToSlotContextMatcher matcher() {
+ if (priorAllocations.isEmpty()) {
+ return new
LocalityAwareRequirementsToSlotMatcher(preferredLocations);
+ } else {
+ return new
PreviousAllocationProfileToSlotContextMatcher(priorAllocations);
+ }
+ }
+
+ /**
+ * Classes that implement this interface provide a method to match
objects to somehow represent slot candidates
+ * against the {@link SlotProfile} that produced the matcher object. A
matching candidate is transformed into a
+ * desired result. If the matcher does not find a matching candidate,
it returns null.
+ */
+ public interface ProfileToSlotContextMatcher {
+
+ /**
+ * @param candidates stream of candidates to
match against.
+ * @param contextExtractor function to extract the
{@link SlotContext} from the candidates.
+ * @param additionalRequirementsFilter predicate to specify
additional requirements for each candidate.
+ * @param resultProducer function to produce a
result from a matching candidate input.
+ * @param <IN> type of the objects
against we match the profile.
+ * @param <OUT> type of the produced
output from a matching object.
+ * @return the result produced by resultProducer if a matching
candidate was found or null otherwise.
+ */
+ @Nullable
+ <IN, OUT> OUT findMatchWithLocality(
+ @Nonnull Stream<IN> candidates,
+ @Nonnull Function<IN, SlotContext> contextExtractor,
+ @Nonnull Predicate<IN> additionalRequirementsFilter,
+ @Nonnull BiFunction<IN, Locality, OUT> resultProducer);
+ }
+
+ /**
+ * This matcher implementation is the presence of prior allocations.
Prior allocations are supposed to overrule
+ * other locality requirements, such as preferred locations. Prior
allocations also require strict matching and
+ * this matcher returns null if it cannot find a candidate for the same
prior allocation. The background is that
+ * this will force the scheduler tor request a new slot that is
guaranteed to be not the prior location of any
+ * other subtask, so that subtasks do not steal another subtasks prior
allocation in case that the own prior
+ * allocation is no longer available (e.g. machine failure). This is
important to enable local recovery for all
+ * tasks that can still return to their prior allocation.
+ */
+ @VisibleForTesting
+ public static class PreviousAllocationProfileToSlotContextMatcher
implements ProfileToSlotContextMatcher {
+
+ /** Set of prior allocations. */
+ private final HashSet<AllocationID> priorAllocations;
+
+ @VisibleForTesting
+
PreviousAllocationProfileToSlotContextMatcher(Collection<AllocationID>
priorAllocations) {
+ this.priorAllocations = new HashSet<>(priorAllocations);
+ Preconditions.checkState(
+ this.priorAllocations.size() > 0,
+ "This matcher should only be used if there are
prior allocations!");
+ }
+
+ public <I, O> O findMatchWithLocality(
+ @Nonnull Stream<I> candidates,
+ @Nonnull Function<I, SlotContext> contextExtractor,
+ @Nonnull Predicate<I> additionalRequirementsFilter,
+ @Nonnull BiFunction<I, Locality, O> resultProducer) {
+
+ Predicate<I> filterByAllocation =
+ (candidate) ->
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+
+ return candidates
+
.filter(filterByAllocation.and(additionalRequirementsFilter))
+ .findFirst()
+ .map((result) -> resultProducer.apply(result,
Locality.LOCAL)) // TODO introduce special locality?
+ .orElse(null);
+ }
+ }
+
+ /**
+ * This matcher is used whenever no prior allocation was specified in
the {@link SlotProfile}. This implementation
+ * tries to achieve best possible locality if a preferred location is
specified in the profile.
+ */
+ @VisibleForTesting
+ public static class LocalityAwareRequirementsToSlotMatcher implements
ProfileToSlotContextMatcher {
+
+ private final Collection<TaskManagerLocation>
locationPreferences;
+
+ @VisibleForTesting
+ public
LocalityAwareRequirementsToSlotMatcher(Collection<TaskManagerLocation>
locationPreferences) {
+ this.locationPreferences = locationPreferences;
+ }
+
+ @Override
+ public <IN, OUT> OUT findMatchWithLocality(
+ @Nonnull Stream<IN> candidates,
+ @Nonnull Function<IN, SlotContext> contextExtractor,
--- End diff --
I think we need to keep this so that we can obtain the allocation id in
`PreviousAllocationProfileToSlotContextMatcher#findMatchWithLocality`.
---