Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski closed pull request #24784: [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… URL: https://github.com/apache/flink/pull/24784 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2136510387 > Thanks for the update @ldadima . I have fixed up a couple of things in #24857 so let me close this PR in favour of that one. Could you take a look at my version? I checked the MR. I agree with changes about ITCase. In my case I forget about uid, that's why map vertex doesn't work for me in rescale case. Thanks for changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2135614198 Thanks for the update @ldadima . I have fixed up a couple of things in https://github.com/apache/flink/pull/24857 so let me close this PR in favour of that one. Could you take a look at my version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Earlier, I misjudged the error of not noticing the negation in the condition. But by substituting `anyMatch` for `allMatch`, you can see that the test will stop passing. Also I changed gateIdx from random generation to always zero (in `StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no affects for other tests. But I need zero for this test, because need stability for the number of input gates -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2124300783 > ## CI report: > * [ca6f434](https://github.com/apache/flink/commit/ca6f43494af9f19bddba40487a90bc4cbfb2f5ff) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=59730) > > Bot commands @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Earlier, I misjudged the error of not noticing the negation in the condition. But by substituting `anyMatch` for `allMatch`, you can see that the test will stop passing. Also I changed gateIdx from random generation to always zero (in `StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no affects for other tests. But I need zero for this test, because you need stability for the number of input gates -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1609248919 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: I'm sorry, but I think this is too complicated a way to implement the test. That's why I came up with an idea that is not as elegant and correct as yours, but is easier to implement and allows you to do without changing less code. Please check it out. Thank you for your patience -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2120974575 Sorry to bother you again, but I made a separate one [Pull request with hotfix](https://github.com/apache/flink/pull/24811) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can change a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can add a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can add a test with incomplete FULL, you are right. I hope I didn't make a mistake with the error note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607061596 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: A difficult way out of the situation, but quite feasible, ok. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606882746 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: > You could always most likely change the record type for UnalignedCheckpointRescaleITCase. String there should work just as fine, but that's probably more work vs option 1. I've just realised that I don't see any place in the `UnalignedCheckpointRescaleITCase` that hardcodes the type of the record. You can add a new `Topology` (`UnalignedCheckpointRescaleITCase.Topology`) that creates any JobGraph, so you can keep your proposed records format. Also I wouldn't mind if you re-used the same `LongSource`, but changed the record type from `Long` to some POJO of `Long` and `String payload` ``` public static class Record { private final long value; private final @Nullable String payload; (...) } ``` where `payload` length/size would be configurable, between 0 (`payload == null`) to whatever value you would configured (`3713`?). This way you won't be duplicating all of the setup code and you will leverage the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Bumping: > Maybe in StateAssignmentOperationTest create a unit test that has one FULL and one something else, and assert that the assigned states are as they should be? > Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression? sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code: ``` boolean noNeedRescale = stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() .map(JobEdge::getDownstreamSubtaskStateMapper) .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) && stateAssignment.executionJobVertex.getInputs().stream() .map(IntermediateResult::getProducer) .map(vertexAssignments::get) .anyMatch( taskStateAssignment -> { final int oldParallelism = stateAssignment .oldState .get(stateAssignment.inputOperatorID) .getParallelism(); return oldParallelism == taskStateAssignment.executionJobVertex .getParallelism(); }); if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) { stateAssignment.inputChannelStates.putAll( toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState)); return; } ``` the tests in `StateAssignmentOperationTest` are still green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: > Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression? Hey, sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code: ``` boolean noNeedRescale = stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() .map(JobEdge::getDownstreamSubtaskStateMapper) .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) && stateAssignment.executionJobVertex.getInputs().stream() .map(IntermediateResult::getProducer) .map(vertexAssignments::get) .anyMatch( taskStateAssignment -> { final int oldParallelism = stateAssignment .oldState .get(stateAssignment.inputOperatorID) .getParallelism(); return oldParallelism == taskStateAssignment.executionJobVertex .getParallelism(); }); if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) { stateAssignment.inputChannelStates.putAll( toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState)); return; } ``` the tests in `StateAssignmentOperationTest` are still green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606228058 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: An important point for test repeatability is to split the record into two parts (one at the sink input, one at the source output). That's why we use a string with a sufficiently long length and all bytes set to 1 to be sure to read data leading to exception in case of splitting. That's why need to realize 3rd option. But maybe we can don't change Long to String, but just to add StringSource exactly for CustomPartitioner case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606228058 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: An important point for test repeatability is to split the record into two parts (one at the sink input, one at the source output). That's why we use a string with a sufficiently long length and all bytes set to 1 to be sure to read data leading to exception in case of splitting. That's why need to realize 3rd option -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606219365 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java: ## @@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks -return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); +StreamPartitioner partitionerCopy = partitioner.copy(); +partitionerCopy.setup(numberOfChannels); +return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); Review Comment: Extracted to `[hotfix]` [PR](https://github.com/apache/flink/pull/24811) Can you also review this one too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606219365 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java: ## @@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks -return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); +StreamPartitioner partitionerCopy = partitioner.copy(); +partitionerCopy.setup(numberOfChannels); +return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); Review Comment: Extracted to `[hotfix]` [PR](https://github.com/apache/flink/pull/24811) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2117031802 Also please note that azure is failing on some compilation issue. Probably some check style violation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1604531639 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java: ## @@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks -return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); +StreamPartitioner partitionerCopy = partitioner.copy(); +partitionerCopy.setup(numberOfChannels); +return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); Review Comment: Ok, in that case: is this a production bug (sounds like it), or only issue in testing? 1. If this is a production bug, please: - create a separate jira ticket for this issue - extract this change to a separate commit (can be part of this PR or a new PR, as you prefer) - add a test coverage? 2. If that's just testing issue: - extract this change to a separate `[hotfix]` commit in this PR ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: > But for the regular repeatability of the test it was necessary to use the String. There are plenty of solutions to this problem: 1. You are encoding in your `String` record three fields: `partition`, `index` and some `payload`. But I don't see you using anything but the `partition`, so you could convert your whole `String` record into `Long` record with value just for the `partition`. 2. You can encode any number of fields in single `Long` just as well. The easiest would be sth like that: ``` long encode(int partition, int index, int payload) { checkArgument(partition < 1024); checkArgument(index < 1024); checkArgument(payload < Long.MAX_VALUE / (1024*1024)); return partition + index * 1024 + payload * 1024 * 1024; } ``` 3. You could always most likely change the record type for `UnalignedCheckpointRescaleITCase`. `String` there should work just as fine, but that's probably more work vs option 1. > but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full graph (change parallelism for all vertexes), but need to change only one vertex When creating job graph in `Topology` (`UnalignedCheckpointRescaleITCase.Topology#create`), you can set parallelism per vertex and AFAIK that will ov
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as stream data. But for the regular repeatability of the test it was necessary to use the `String`. I will try to add test to `UnalignedCheckpointRescaleITCase`, but I'm not sure. Also in `UnalignedCheckpointRescaleITCase` rescale full graph (change parallelism for all vertexes), but need to change only one vertex, not all. I have no ideas, how to add new Topology for this test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as stream data. But for the regular repeatability of the test it was necessary to use the `String`. I will try to add test to `UnalignedCheckpointRescaleITCase`, but I'm not sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as stream data. But for the regular repeatability of the test it was necessary to use the `String` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603115904 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { + +@Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + +private static final File CHECKPOINT_FILE = new File("src/test/resources/custom-checkpoint"); Review Comment: Ok. I agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603115064 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java: ## @@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks -return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); +StreamPartitioner partitionerCopy = partitioner.copy(); +partitionerCopy.setup(numberOfChannels); +return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); Review Comment: By testing reported bug, I found little problem with copying partitioner in rescaling case. It can be throws ArithmeticException /by zero for CustomPartitioners (there used something like ` ... % numberOfChannels`), because after copying we have numberOfChannels is 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1603110590 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: The mistake was semantic. It was checked that all the inputs were of FULL type, and it is necessary to have at least one. But since the type of partitioner seems to be the same for all inputs, I don't see how to add a test to check it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1602844798 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: nit: `UnalignedCheckpointRescaleWithCustomPartitionITCase`? ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { + +@Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + +pr
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1602832692 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the explanation. > But I made little mistake in condition. I will fix it Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2111794984 > ## CI report: > * [309cf5b](https://github.com/apache/flink/commit/309cf5b7c295a56527b71ac0a1869ce06cd3a244) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=59567) > > Bot commands @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Separation is a good idea, ok. The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed. But I made little mistake in condition. I will fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Separation is a good idea, ok. The first part is to check if SubtaskStateMapper.FULL is used in any input (this mapper type always returns all I/O), the problem is reproduced only for this mapper type The second part is to check if the parallelism of any previous operator has changed. if it has changed, it means that the number of outputs has changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1599796750 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Could you explain the logic behind this condition? Maybe both of it's parts separately? ``` stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() .map(JobEdge::getDownstreamSubtaskStateMapper) .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) ``` and ``` stateAssignment.executionJobVertex.getInputs().stream() .map(IntermediateResult::getProducer) .map(vertexAssignments::get) .anyMatch( taskStateAssignment -> { final int oldParallelism = stateAssignment .oldState .get(stateAssignment.inputOperatorID) .getParallelism(); return oldParallelism == taskStateAssignment.executionJobVertex .getParallelism(); } ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
flinkbot commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2109757235 ## CI report: * 487dc362ac9f6d0b4b32f7545d4a1d7329de1342 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima opened a new pull request, #24784: URL: https://github.com/apache/flink/pull/24784 ## What is the purpose of the change To fix [FLINK-35351](https://issues.apache.org/jira/browse/FLINK-35351) ## Verifying this change UnalignedCheckpointCustomRescaleITCase#restoreFromCheckpoint ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org