Izeren commented on code in PR #26663:
URL: https://github.com/apache/flink/pull/26663#discussion_r2156660743


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java:
##########
@@ -74,6 +78,56 @@ public void testDownScaleWithUnevenStateSize() {
         verifyAssignments(assignments, newParallelism, allocationWith200bytes);
     }
 
+    @Test
+    // In case of local recovery, we want to preserve slot allocations even if 
there is no
+    // keyed managed state available.
+    public void testSlotsPreservationWithNoStateSameParallelism() {
+        final int parallelism = 2;
+        final VertexInformation vertex = createVertex(parallelism);
+        final AllocationID allocationID1 = new AllocationID();
+        final AllocationID allocationID2 = new AllocationID();
+
+        final List<VertexAllocationInformation> previousAllocations =
+                Arrays.asList(
+                        new VertexAllocationInformation(
+                                allocationID1, vertex.getJobVertexID(), 
KeyGroupRange.of(0, 63), 0),
+                        new VertexAllocationInformation(
+                                allocationID2,
+                                vertex.getJobVertexID(),
+                                KeyGroupRange.of(64, 127),
+                                0));
+
+        final Collection<SlotAssignment> assignments =
+                assign(
+                        vertex,
+                        // Providing allocation IDs in reverse order to check 
that assigner fixes
+                        // the order based on previous allocations.
+                        Arrays.asList(allocationID2, allocationID1),
+                        previousAllocations);
+
+        // Extract allocation IDs from assignments sorted by subtask index.
+        final List<AllocationID> subtaskOrderedNewAllocations =
+                assignments.stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        assignment ->
+                                                assignment
+                                                        .getTargetAs(
+                                                                
SlotSharingSlotAllocator
+                                                                        
.ExecutionSlotSharingGroup
+                                                                        .class)
+                                                        
.getContainedExecutionVertices()
+                                                        .stream()
+                                                        .mapToInt(
+                                                                
ExecutionVertexID::getSubtaskIndex)
+                                                        .findAny()
+                                                        .orElseThrow()))
+                        .map(assignment -> 
assignment.getSlotInfo().getAllocationId())
+                        .collect(Collectors.toList());
+
+        assertThat(subtaskOrderedNewAllocations, contains(allocationID1, 
allocationID2));

Review Comment:
   Noted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to