ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
                 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
         final List<List<InputChannelStateHandle>> inputOperatorState =
                 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-        if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+        boolean noNeedRescale =
+                
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+                                .map(JobEdge::getDownstreamSubtaskStateMapper)
+                                .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+                        && 
stateAssignment.executionJobVertex.getInputs().stream()
+                                .map(IntermediateResult::getProducer)
+                                .map(vertexAssignments::get)
+                                .anyMatch(
+                                        taskStateAssignment -> {
+                                            final int oldParallelism =
+                                                    stateAssignment
+                                                            .oldState
+                                                            
.get(stateAssignment.inputOperatorID)
+                                                            .getParallelism();
+                                            return oldParallelism
+                                                    == 
taskStateAssignment.executionJobVertex
+                                                            .getParallelism();
+                                        });

Review Comment:
   Earlier, I misjudged the error of not noticing the negation in the 
condition. 
   But by substituting `anyMatch` for `allMatch`, you can see that the test 
will stop passing.
   Also I changed gateIdx from random generation to always zero (in 
`StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no 
affects for other tests. But I need zero for this test, because you need 
stability for the number of input gates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to