Izeren commented on code in PR #26663: URL: https://github.com/apache/flink/pull/26663#discussion_r2156660743
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java: ########## @@ -74,6 +78,56 @@ public void testDownScaleWithUnevenStateSize() { verifyAssignments(assignments, newParallelism, allocationWith200bytes); } + @Test + // In case of local recovery, we want to preserve slot allocations even if there is no + // keyed managed state available. + public void testSlotsPreservationWithNoStateSameParallelism() { + final int parallelism = 2; + final VertexInformation vertex = createVertex(parallelism); + final AllocationID allocationID1 = new AllocationID(); + final AllocationID allocationID2 = new AllocationID(); + + final List<VertexAllocationInformation> previousAllocations = + Arrays.asList( + new VertexAllocationInformation( + allocationID1, vertex.getJobVertexID(), KeyGroupRange.of(0, 63), 0), + new VertexAllocationInformation( + allocationID2, + vertex.getJobVertexID(), + KeyGroupRange.of(64, 127), + 0)); + + final Collection<SlotAssignment> assignments = + assign( + vertex, + // Providing allocation IDs in reverse order to check that assigner fixes + // the order based on previous allocations. + Arrays.asList(allocationID2, allocationID1), + previousAllocations); + + // Extract allocation IDs from assignments sorted by subtask index. + final List<AllocationID> subtaskOrderedNewAllocations = + assignments.stream() + .sorted( + Comparator.comparingInt( + assignment -> + assignment + .getTargetAs( + SlotSharingSlotAllocator + .ExecutionSlotSharingGroup + .class) + .getContainedExecutionVertices() + .stream() + .mapToInt( + ExecutionVertexID::getSubtaskIndex) + .findAny() + .orElseThrow())) + .map(assignment -> assignment.getSlotInfo().getAllocationId()) + .collect(Collectors.toList()); + + assertThat(subtaskOrderedNewAllocations, contains(allocationID1, allocationID2)); Review Comment: Noted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org