ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ########## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List<List<InputChannelStateHandle>> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); - if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + + boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() + .map(JobEdge::getDownstreamSubtaskStateMapper) + .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) + && stateAssignment.executionJobVertex.getInputs().stream() + .map(IntermediateResult::getProducer) + .map(vertexAssignments::get) + .anyMatch( + taskStateAssignment -> { + final int oldParallelism = + stateAssignment + .oldState + .get(stateAssignment.inputOperatorID) + .getParallelism(); + return oldParallelism + == taskStateAssignment.executionJobVertex + .getParallelism(); + }); Review Comment: Earlier, I misjudged the error of not noticing the negation in the condition. But by substituting `anyMatch` for `allMatch`, you can see that the test will stop passing. Also I changed gateIdx from random generation to always zero (in `StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no affects for other tests. But I need zero for this test, because you need stability for the number of input gates -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org