rkhachatryan commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1193902234


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -136,19 +136,24 @@ public void assignStates() {
 
         // repartition state
         for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-            if (stateAssignment.hasNonFinishedState) {
+            if (stateAssignment.hasNonFinishedState
+                    // FLINK-31963: We need to run repartitioning for 
stateless operators that have
+                    // upstream output or downstream input states.
+                    || stateAssignment.hasUpstreamOutputStates()
+                    || stateAssignment.hasDownstreamInputStates()) {
                 assignAttemptState(stateAssignment);
             }
         }
 
         // actually assign the state
         for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-            // If upstream has output states, even the empty task state should 
be assigned for the
-            // current task in order to notify this task that the old states 
will send to it which
-            // likely should be filtered.
+            // If upstream has output states or downstream has input states, 
even the empty task
+            // state should be assigned for the current task in order to 
notify this task that the
+            // old states will send to it which likely should be filtered.
             if (stateAssignment.hasNonFinishedState
                     || stateAssignment.isFullyFinished
-                    || stateAssignment.hasUpstreamOutputStates()) {
+                    || stateAssignment.hasUpstreamOutputStates()
+                    || stateAssignment.hasDownstreamInputStates()) {

Review Comment:
   Should we compute `hasUpstreamOutputStates` and `hasDownstreamInputStates` 
once, similar to `isFullyFinished`?
   (that would at least be consistent, and also faster)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to