StephanEwen commented on a change in pull request #14259:
URL: https://github.com/apache/flink/pull/14259#discussion_r532764178



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
##########
@@ -114,15 +113,20 @@ public void 
recordSplitAssignment(SplitsAssignment<SplitT> splitsAssignment) {
         * if those splits were never assigned. To handle this case, the 
coordinator needs to find those
         * splits and return them back to the SplitEnumerator for re-assignment.
         *
-        * @param failedSubtaskId the failed subtask id.
+        * @param subtaskId the subtask id of the reader that failed over.
+        * @param restoredCheckpointId the ID of the checkpoint that the reader 
was restored to.
         * @return A list of splits that needs to be added back to the {@link 
SplitEnumerator}.
         */
-       public List<SplitT> getAndRemoveUncheckpointedAssignment(int 
failedSubtaskId) {
-               List<SplitT> splits = new ArrayList<>();
-               assignmentsByCheckpointId.values().forEach(assignments -> {
-                       removeFromAssignment(failedSubtaskId, assignments, 
splits);
-               });
-               removeFromAssignment(failedSubtaskId, 
uncheckpointedAssignments, splits);
+       public List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, 
long restoredCheckpointId) {
+               final ArrayList<SplitT> splits = new ArrayList<>();
+
+               for (final Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> 
entry : assignmentsByCheckpointId.entrySet()) {

Review comment:
       I actually had that code initially, but the `tailMap()`' lower bound key 
is inclusive (anything >= checkpointID) and we need a tailmap where the key is 
exclusive. Compensating for that makes the code in the end more complex than 
this version here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to