[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-03-02 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1122893111


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java:
##
@@ -143,7 +143,7 @@ private void handleExecutionGraphCreation(
 } else {
 logger.debug(
 "Failed to reserve and assign the required slots. 
Waiting for new resources.");
-context.goToWaitingForResources();
+context.goToWaitingForResources(null);

Review Comment:
   Good point, thanks!
   No, it's not correct. But the fix isn't complex - I pushed 
c4aaddb464f39d454d6d4f362cf4a0cf5b4973af to add the `previousExecutionGraph` to 
`CreatingExecutionGraph` and then pass it to `WaitingForResources`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-03-02 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1122889885


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -784,7 +789,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph(
 }
 
 @Override
-public void goToWaitingForResources() {
+public void goToWaitingForResources(@Nullable ExecutionGraph 
executionGraph) {

Review Comment:
   Done in aad45c9f66e0cad768be2f8172b569f61ace3477.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-03-02 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1122889605


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+
+/** Information about allocations of Job Vertices. */
+@Internal
+public class JobAllocationsInformation {
+
+private final Map> 
vertexAllocations;
+
+JobAllocationsInformation(
+Map> 
vertexAllocations) {
+this.vertexAllocations = vertexAllocations;
+}
+
+public static JobAllocationsInformation fromGraph(@Nullable ExecutionGraph 
graph) {
+return graph == null ? empty() : new 
JobAllocationsInformation(calculateAllocations(graph));
+}
+
+public List getAllocations(JobVertexID 
jobVertexID) {
+return vertexAllocations.getOrDefault(jobVertexID, emptyList());
+}
+
+private static Map> 
calculateAllocations(
+ExecutionGraph graph) {
+final Map> allocations 
= new HashMap<>();
+for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
+JobVertexID jobVertexId = vertex.getJobVertexId();
+for (ExecutionVertex executionVertex : vertex.getTaskVertices()) {
+AllocationID allocationId =
+
executionVertex.getCurrentExecutionAttempt().getAssignedAllocationID();
+KeyGroupRange kgr =
+
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+vertex.getMaxParallelism(),
+vertex.getParallelism(),
+executionVertex.getParallelSubtaskIndex());
+allocations
+.computeIfAbsent(jobVertexId, ignored -> new 
ArrayList<>())
+.add(new VertexAllocationInformation(allocationId, 
jobVertexId, kgr));
+}
+}
+return allocations;
+}
+
+public static JobAllocationsInformation empty() {
+return new JobAllocationsInformation(emptyMap());
+}
+
+public boolean isEmpty() {
+return vertexAllocations.isEmpty();
+}
+
+/** Information about the allocations of a single Job Vertex. */
+public static class VertexAllocationInformation {
+public final AllocationID allocationID;
+public final JobVertexID jobVertexID;
+public final KeyGroupRange keyGroupRange;

Review Comment:
   Done in 06dede2f641afcb9f4c1d5129806d52d3cbf08a4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1120034105


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1120015731


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##
@@ -54,14 +58,35 @@ public interface SlotAllocator {
 Optional determineParallelism(
 JobInformation jobInformation, Collection 
slots);
 
+/**
+ * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+ * assignment of slots to execution slot sharing groups.
+ */
+default Optional 
determineParallelismAndCalculateAssignment(
+JobInformation jobInformation,
+Collection slots,
+@Nullable ExecutionGraph previousExecutionGraph) {
+return determineParallelismAndCalculateAssignment(
+jobInformation,
+slots,
+JobAllocationsInformation.fromGraph(
+previousExecutionGraph,
+StateSizeEstimates.fromGraph(previousExecutionGraph)));
+}

Review Comment:
   Done in 106dbceed065c38347f33e7c9320e0b0b904e0ba.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119982866


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java:
##
@@ -23,14 +23,27 @@
 import java.util.Map;
 
 /**
- * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)}, describing
- * the parallelism each vertex could be scheduled with.
- *
- * {@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#tryReserveResources(VertexParallelism)}.
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)} among with
+ * {@link 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment
+ * slotAssignments}, describing the parallelism each vertex could be scheduled 
with.
  */
-public interface VertexParallelism {
-Map getMaxParallelismForVertices();
+public class VertexParallelism {

Review Comment:
   Good point, I'll adjust the interface and its javadoc.
   
   4b5987c1da0c34069e029c00e5d891ae3261fef9
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119982866


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java:
##
@@ -23,14 +23,27 @@
 import java.util.Map;
 
 /**
- * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)}, describing
- * the parallelism each vertex could be scheduled with.
- *
- * {@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#tryReserveResources(VertexParallelism)}.
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)} among with
+ * {@link 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment
+ * slotAssignments}, describing the parallelism each vertex could be scheduled 
with.
  */
-public interface VertexParallelism {
-Map getMaxParallelismForVertices();
+public class VertexParallelism {

Review Comment:
   Good point, I'll adjust the interface and its javadoc.
   
   106dbceed065c38347f33e7c9320e0b0b904e0ba



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1120003300


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##
@@ -54,14 +58,35 @@ public interface SlotAllocator {
 Optional determineParallelism(
 JobInformation jobInformation, Collection 
slots);
 
+/**
+ * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+ * assignment of slots to execution slot sharing groups.
+ */
+default Optional 
determineParallelismAndCalculateAssignment(
+JobInformation jobInformation,
+Collection slots,
+@Nullable ExecutionGraph previousExecutionGraph) {
+return determineParallelismAndCalculateAssignment(
+jobInformation,
+slots,
+JobAllocationsInformation.fromGraph(
+previousExecutionGraph,
+StateSizeEstimates.fromGraph(previousExecutionGraph)));
+}

Review Comment:
   Right, mocking was the reason why I introduced the 2nd method. 
   I agree, now with `JobAllocationsInformation` that method can be directly 
called from the scheduler. 
   I'll remove this method with `ExecutionGraph`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119993003


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1401,7 +1403,9 @@ public void 
testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvail
 adaptiveScheduler.tryToAssignSlots(
 
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
 new StateTrackingMockExecutionGraph(),
-new 
CreatingExecutionGraphTest.TestingVertexParallelism()));
+new JobSchedulingPlan(
+new 
VertexParallelism(Collections.emptyMap()),

Review Comment:
   Done in 1953ef9a6f295b77b410e96de9649cace1aaad34.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119982866


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java:
##
@@ -23,14 +23,27 @@
 import java.util.Map;
 
 /**
- * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)}, describing
- * the parallelism each vertex could be scheduled with.
- *
- * {@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#tryReserveResources(VertexParallelism)}.
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)} among with
+ * {@link 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment
+ * slotAssignments}, describing the parallelism each vertex could be scheduled 
with.
  */
-public interface VertexParallelism {
-Map getMaxParallelismForVertices();
+public class VertexParallelism {

Review Comment:
   Good point, I'll adjust the interface and its javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119952982


##
flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java:
##
@@ -67,12 +68,14 @@
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests local recovery by restarting Flink processes. */
 @ExtendWith(TestLoggerExtension.class)
 class LocalRecoveryITCase {

Review Comment:
   The test wasn't failing because the assertion failed the task, causing a 
restart; eventually, the right allocation ID would be picked up.
   I fixed it in 6c0284ca7bfe09a94740db27db38279234222c63 `[hotfix][tests] Make 
LocalRecoveryITCase fail when allocations don't match` by checking for any 
allocationID mismatches (in the main thread).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-28 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119813160


##
flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java:
##
@@ -67,12 +68,14 @@
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests local recovery by restarting Flink processes. */
 @ExtendWith(TestLoggerExtension.class)
 class LocalRecoveryITCase {

Review Comment:
   When the Adaptive Scheduler is enabled in `enable-adaptive-scheduler` maven 
[profile](https://github.com/apache/flink/blob/161014149e803bfd1d3653badb230b2ed36ce3cb/pom.xml#L976),
 it get picked up by 
[`ClusterOptions`](https://github.com/apache/flink/blob/161014149e803bfd1d3653badb230b2ed36ce3cb/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java#L226).
   
   This test then kill a TM and verifies that a task get rescheduled onto the 
TM with the same `AllocationID`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-27 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119009430


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -978,8 +983,8 @@ public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(
 executionGraph.setInternalTaskFailuresListener(
 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
-final VertexParallelism vertexParallelism =
-executionGraphWithVertexParallelism.getVertexParallelism();
+final JobSchedulingPlan vertexParallelism =

Review Comment:
   Renamed in ccbc071620f4f2c5c2ecb3905afb1afc4c5e6e1d.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-27 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119007288


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+public class DefaultSlotAssigner implements SlotAssigner {
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+
+Iterator iterator = freeSlots.iterator();
+Collection assignments = new ArrayList<>();
+for (ExecutionSlotSharingGroup group : allGroups) {
+assignments.add(new SlotAssignment(iterator.next(), group));

Review Comment:
   Added checks in 70379da8d1bb9d43b0218f8a05ccec22819e222c.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-27 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119006691


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   Renamed in a4ae3d608e449dd448861dbd1d0fb1f129d5824b.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117237527


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   Yes, I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116992436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   Implemented in 6d4971f8f646409ff09173f7dfe1d5245b95706f and 
5fe039b4cb507fcfcc88906c40d86561fc9e8ed5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116959052


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -157,13 +165,21 @@ public Map calculateScore(
 .getMaxParallelism(),
 parallelism.get(evi.getJobVertexId()),
 evi.getSubtaskIndex());
+// Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+// accommodate for averaging non-zero states
+Optional kgSizeMaybe =
+stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+if (!kgSizeMaybe.isPresent()) {
+continue;
+}

Review Comment:
   We still need to consider this state: if we place the task on a different TM 
then it will have to download all its SST files (or am I missing something?)
   
   There are two methods for keyed state:
   1. `handle.getStateSize()` returns the full state size
   2. 
[`handle.getCheckpointedSize()`](https://github.com/apache/flink/blob/464ded1c2a0497255b70f711167c3b7ae52ea0f7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62)
 returns "incremental" state size
   
   As per above, `getStateSize` is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116928384


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -116,66 +116,53 @@ public Collection assignSlots(
 JobInformation jobInformation,
 Collection freeSlots,
 VertexParallelism vertexParallelism) {
-Collection remainingSlots = freeSlots;
-final Collection assignments = new ArrayList<>();
+final List allGroups = new ArrayList<>();
 for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-
-List sharedSlotToVertexAssignment =
-createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup);
-
-SlotAssigner.AssignmentResult result =
-assignSlots(remainingSlots, sharedSlotToVertexAssignment);
-remainingSlots = result.remainingSlots;
-assignments.addAll(result.assignments);
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
 }
-return assignments;
-}
-
-private AssignmentResult assignSlots(

Review Comment:
   Addressed by d3595c472e38a96b66a9ee7ac1a394706802d431.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116916563


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116904470


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, long 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final long score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public long getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Long.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+final List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+final Map parallelism = 
getParallelism(allGroups);
+
+// PQ orders the pairs (allocationID, groupID) by score, decreasing
+// the score is computed as the potential amount of state that would 
reside locally
+final PriorityQueue scores =
+new PriorityQueue<>(Comparator.reverseOrder());
+for (ExecutionSlotSharingGroup group : allGroups) {
+calculateScore(
+group,
+parallelism,
+jobInformation,
+previousAllocations,
+stateSizeEstimates)
+.entrySet().stream()
+.map(e -> new AllocationScore(group.getId(), e.getKey(), 
e.getValue()))
+.forEach(scores::add);
+}
+
+   

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116899011


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {

Review Comment:
   Sure, but as I mentioned 
[above](https://github.com/apache/flink/pull/21981#issuecomment-1442587044), I 
intend to move commit to a separate PR/ticket and add the tests there.
   
   To me, this PR seems big enough already, and it's still meaningful without 
state size estimation (I added `StateSizeEstimates` commit just to validate the 
interfaces).
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116894644


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+public class DefaultSlotAssigner implements SlotAssigner {
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+
+Iterator iterator = freeSlots.iterator();
+Collection assignments = new ArrayList<>();
+for (ExecutionSlotSharingGroup group : allGroups) {
+assignments.add(new SlotAssignment(iterator.next(), group));

Review Comment:
   This shouldn't happen because the computed DoP already takes the number of 
slots into accounts. This is also [the 
case](https://github.com/apache/flink/blob/b987ae18d0bc353c631bc54871b0c16be39dbad2/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java#L129)
 now on `master`.
   
   However, it is now decoupled from DoP calculation, so it's not clear.
   WDYT about checking that `iterator.hasNext()` and just raising an error with 
a relevant message if not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116887448


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   I agree, good points :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116881589


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, long 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final long score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public long getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Long.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+final List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+final Map parallelism = 
getParallelism(allGroups);
+
+// PQ orders the pairs (allocationID, groupID) by score, decreasing
+// the score is computed as the potential amount of state that would 
reside locally
+final PriorityQueue scores =
+new PriorityQueue<>(Comparator.reverseOrder());
+for (ExecutionSlotSharingGroup group : allGroups) {
+calculateScore(
+group,
+parallelism,
+jobInformation,
+previousAllocations,
+stateSizeEstimates)
+.entrySet().stream()
+.map(e -> new AllocationScore(group.getId(), e.getKey(), 
e.getValue()))
+.forEach(scores::add);
+}
+
+   

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116813631


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+
+import java.util.Collection;
+
+/**
+ * A plan that describes how to execute {@link 
org.apache.flink.runtime.jobgraph.JobGraph JobGraph}.
+ *
+ * 
+ *   {@link #vertexParallelism} is necessary to create {@link
+ *   org.apache.flink.runtime.executiongraph.ExecutionGraph ExecutionGraph}
+ *   {@link #slotAssignments} are used to schedule it onto the cluster
+ * 
+ *
+ * {@link AdaptiveScheduler} passes this structure from {@link 
WaitingForResources} to {@link
+ * CreatingExecutionGraph} stages.
+ */
+@Internal
+public class JobSchedulingPlan {
+private final VertexParallelism vertexParallelism;
+private final Collection slotAssignments;
+
+public JobSchedulingPlan(
+VertexParallelism vertexParallelism, Collection 
slotAssignments) {
+this.vertexParallelism = vertexParallelism;
+this.slotAssignments = slotAssignments;
+}
+
+public VertexParallelism getVertexParallelism() {
+return vertexParallelism;
+}
+
+public Collection getSlotAssignments() {
+return slotAssignments;
+}
+
+/** Assignment of a slot to some target (e.g. a slot sharing group). */
+public static class SlotAssignment {
+private final SlotInfo slotInfo;
+/**
+ * Interpreted by {@link
+ * 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator#tryReserveResources(JobSchedulingPlan)}.
+ * This can be a slot sharing group, a task, or something else.
+ */
+private final Object target;

Review Comment:
   Yes, but I wanted to keep the ability to change that. Currently (in 
`master`), other types are allowed by implementing `VertexParallelism`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116811933


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -157,13 +165,21 @@ public Map calculateScore(
 .getMaxParallelism(),
 parallelism.get(evi.getJobVertexId()),
 evi.getSubtaskIndex());
+// Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+// accommodate for averaging non-zero states
+Optional kgSizeMaybe =
+stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+if (!kgSizeMaybe.isPresent()) {
+continue;
+}

Review Comment:
   Yes (the operator didn't add **any** managed keyed state to the latest 
checkpoint).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116364006


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##
@@ -54,6 +54,18 @@ public interface SlotAllocator {
 Optional determineParallelism(
 JobInformation jobInformation, Collection 
slots);
 
+/**
+ * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+ * assignment of slots to execution slot sharing groups.
+ */
+default Optional

Review Comment:
   After an offline discussion, I restructured the interfaces 
(4fa93e3756fad77cf2dbc92bf77614aa39cc28fe..2fc84d83884c1f342d46fec02d961d4f73eb4219).
   
   This particular comment is addressed by 
4fa93e3756fad77cf2dbc92bf77614aa39cc28fe:
   ```
   Restructure 1/5: change types passed between AdaptiveScheduler and 
SlotAssigner
   
   Slot assignments are computed and consumed by SlotAllocator.
   This is expressed implicitly by implementing VertexParallelism.
   
   This change tries to make that clear, while still allowing to assign
   slots to something other than slot sharing groups.
   
   It does so by:
   1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It 
couples VertexParallelism with slot assignments.
   2. Push the polymorphism of state assignments from VertexParallelism to slot 
assignment target in JobSchedulingPlan.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116362299


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116362006


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116360889


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));

Review Comment:
   Fixed by addressing the [comment 
above](https://github.com/apache/flink/pull/21981#discussion_r1112862609).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116359613


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116359318


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358878


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java:
##
@@ -163,7 +165,12 @@ CompletableFuture goToStopWithSavepoint(
 /** Interface covering transition to the {@link WaitingForResources} 
state. */
 interface ToWaitingForResources extends StateTransitions {
 
-/** Transitions into the {@link WaitingForResources} state. */
+/**
+ * Transitions into the {@link WaitingForResources} state without 
{@link ExecutionGraph}
+ * (e.g. after creation).
+ */
 void goToWaitingForResources();
+/** Transitions into the {@link WaitingForResources} state (e.g. after 
restarting). */
+void goToWaitingForResources(ExecutionGraph executionGraph);

Review Comment:
   Removed in 3760a182422db9a7b7cabf420f170346617455fa.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358657


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358477


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116356436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -794,7 +802,23 @@ public void goToWaitingForResources() {
 LOG,
 desiredResources,
 this.initialResourceAllocationTimeout,
-this.resourceStabilizationTimeout));
+this.resourceStabilizationTimeout,
+null));
+}
+
+@Override
+public void goToWaitingForResources(ExecutionGraph executionGraph) {

Review Comment:
   I agree, `state.as` seems error-prone to me. Initial implementation had an 
error related to that (state was never the one expected).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -121,16 +133,13 @@ public Optional 
determineParallelism(
 slotSharingGroupParallelism.get(
 slotSharingGroup.getSlotSharingGroupId()));
 
-final Iterable 
sharedSlotToVertexAssignment =
+final List sharedSlotToVertexAssignment 
=
 createExecutionSlotSharingGroups(vertexParallelism);
 
-for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-sharedSlotToVertexAssignment) {
-final SlotInfo slotInfo = slotIterator.next();
-
-assignments.add(
-new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-}
+SlotAssigner.AssignmentResult result =
+slotAssigner.assignSlots(freeSlots, 
sharedSlotToVertexAssignment);
+assignments.addAll(result.assignments);
+freeSlots = result.remainingSlots;

Review Comment:
   Fixed by restructuring the code and going through all the groups in on go: 
2fc84d83884c1f342d46fec02d961d4f73eb4219 .. 
4fa93e3756fad77cf2dbc92bf77614aa39cc28fe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -121,16 +133,13 @@ public Optional 
determineParallelism(
 slotSharingGroupParallelism.get(
 slotSharingGroup.getSlotSharingGroupId()));
 
-final Iterable 
sharedSlotToVertexAssignment =
+final List sharedSlotToVertexAssignment 
=
 createExecutionSlotSharingGroups(vertexParallelism);
 
-for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-sharedSlotToVertexAssignment) {
-final SlotInfo slotInfo = slotIterator.next();
-
-assignments.add(
-new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-}
+SlotAssigner.AssignmentResult result =
+slotAssigner.assignSlots(freeSlots, 
sharedSlotToVertexAssignment);
+assignments.addAll(result.assignments);
+freeSlots = result.remainingSlots;

Review Comment:
   Fixed by restructuring the code and going through all the groups in on go: 
2fc84d83884c1f342d46fec02d961d4f73eb4219
   ..4fa93e3756fad77cf2dbc92bf77614aa39cc28fe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-21 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1113505496


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##
@@ -54,6 +54,18 @@ public interface SlotAllocator {
 Optional determineParallelism(
 JobInformation jobInformation, Collection 
slots);
 
+/**
+ * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+ * assignment of slots to execution slot sharing groups.
+ */
+default Optional

Review Comment:
   I agree that slot sharing is an implementation detail.
   However, one difference that I'd like to expose, is that 
`VertexParallelismWithSlotSharing` contains assignments, while 
`VertexParallelism` is only about parallelism.
   
   I think the right way to do that is to parameterize `SlotAllocator` with 
this type, and let `SlotSharingSlotAllocator` use 
`VertexParallelismWithSlotSharing`:
   ```
   interface SlotAllocator {
 VertexParallelism determineParallelism(); // for hasResources, canScaleUp
 A determineAssignments(...); // for tryReserveResources later
 tryReserveResources(A);
   }
   ```
   WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-21 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1112940402


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-21 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1112921579


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-21 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1112741455


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -121,16 +133,13 @@ public Optional 
determineParallelism(
 slotSharingGroupParallelism.get(
 slotSharingGroup.getSlotSharingGroupId()));
 
-final Iterable 
sharedSlotToVertexAssignment =
+final List sharedSlotToVertexAssignment 
=
 createExecutionSlotSharingGroups(vertexParallelism);
 
-for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-sharedSlotToVertexAssignment) {
-final SlotInfo slotInfo = slotIterator.next();
-
-assignments.add(
-new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-}
+SlotAssigner.AssignmentResult result =
+slotAssigner.assignSlots(freeSlots, 
sharedSlotToVertexAssignment);
+assignments.addAll(result.assignments);
+freeSlots = result.remainingSlots;

Review Comment:
   Because `assignSlots` is called in a loop, and internally iterates over the 
`freeSlots`, the complexity here is suboptimal. Lower-priority slots are added 
over and over into the queue inside `assignSlots`, resulting in quadratic 
complexity.
   
   I was thinking about making free slots part of `SlotAssigner` state (passed 
into constructor).
   
   WDYT @dmvk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org