Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-29 Thread via GitHub


pnowojski closed pull request #24784: [FLINK-35351][checkpoint] Fix fail during 
restore from unaligned chec…
URL: https://github.com/apache/flink/pull/24784


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-28 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2136510387

   > Thanks for the update @ldadima . I have fixed up a couple of things in 
#24857 so let me close this PR in favour of that one. Could you take a look at 
my version?
   
   I checked the MR. I agree with changes about ITCase. In my case I forget 
about uid, that's why map vertex doesn't work for me in rescale case. Thanks 
for changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-28 Thread via GitHub


pnowojski commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2135614198

   Thanks for the update @ldadima . I have fixed up a couple of things in 
https://github.com/apache/flink/pull/24857 so let me close this PR in favour of 
that one. Could you take a look at my version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-22 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Earlier, I misjudged the error of not noticing the negation in the 
condition. 
   But by substituting `anyMatch` for `allMatch`, you can see that the test 
will stop passing.
   Also I changed gateIdx from random generation to always zero (in 
`StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no 
affects for other tests. But I need zero for this test, because need stability 
for the number of input gates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-22 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2124300783

   > ## CI report:
   > * 
[ca6f434](https://github.com/apache/flink/commit/ca6f43494af9f19bddba40487a90bc4cbfb2f5ff)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=59730)
   > 
   > Bot commands
   
   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-22 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1609545011


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Earlier, I misjudged the error of not noticing the negation in the 
condition. 
   But by substituting `anyMatch` for `allMatch`, you can see that the test 
will stop passing.
   Also I changed gateIdx from random generation to always zero (in 
`StateHandleDummyUtil#createNewInputChannelStateHandle`). This change have no 
affects for other tests. But I need zero for this test, because you need 
stability for the number of input gates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-21 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1609248919


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   I'm sorry, but I think this is too complicated a way to implement the test. 
That's why I came up with an idea that is not as elegant and correct as yours, 
but is easier to implement and allows you to do without changing less code. 
Please check it out. Thank you for your patience



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2120974575

   Sorry to bother you again, but I made a separate one [Pull request with 
hotfix](https://github.com/apache/flink/pull/24811)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can change a test with incomplete 
FULL, you are right.  I hope I didn't make a mistake with the mistake note last 
time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can add a test with incomplete FULL, 
you are right.  I hope I didn't make a mistake with the mistake note last time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can add a test with incomplete FULL, 
you are right.  I hope I didn't make a mistake with the error note last time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607061596


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   A difficult way out of the situation, but quite feasible, ok.  Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606882746


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   > You could always most likely change the record type for 
UnalignedCheckpointRescaleITCase. String there should work just as fine, but 
that's probably more work vs option 1.
   
   I've just realised that I don't see any place in the 
`UnalignedCheckpointRescaleITCase` that hardcodes the type of the record. You 
can add a new `Topology` (`UnalignedCheckpointRescaleITCase.Topology`) that 
creates any JobGraph, so you can keep your proposed records format. Also I 
wouldn't mind if you re-used the same `LongSource`, but changed the record type 
from `Long` to some POJO of `Long` and `String payload`
   ```
   public static class Record {
 private final long value;
 private final @Nullable String payload;
 (...)
   }
   ```
   where `payload` length/size would be configurable, between 0 (`payload == 
null`) to whatever value you would configured (`3713`?).
   
   This way you won't be duplicating all of the setup code and you will 
leverage the same 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Bumping:
   
   > Maybe in StateAssignmentOperationTest create a unit test that has one FULL 
and one something else, and assert that the assigned states are as they should 
be?
   
   > Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?
   
   sorry to bother you again, but the unit test that you have added still 
doesn't have test coverage. When I try running your previous version of the 
code:
   ```
   boolean noNeedRescale =
   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
   
.map(JobEdge::getDownstreamSubtaskStateMapper)
   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
   && 
stateAssignment.executionJobVertex.getInputs().stream()
   .map(IntermediateResult::getProducer)
   .map(vertexAssignments::get)
   .anyMatch(
   taskStateAssignment -> {
   final int oldParallelism =
   stateAssignment
   .oldState
   
.get(stateAssignment.inputOperatorID)
   
.getParallelism();
   return oldParallelism
   == 
taskStateAssignment.executionJobVertex
   
.getParallelism();
   });
   
   if (inputState.getParallelism() == 
executionJobVertex.getParallelism() && !noNeedRescale) {
   stateAssignment.inputChannelStates.putAll(
   toInstanceMap(stateAssignment.inputOperatorID, 
inputOperatorState));
   return;
   }
   ```
   
   the tests in `StateAssignmentOperationTest` are still green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   > Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?
   
   
   Hey, sorry to bother you again, but the unit test that you have added still 
doesn't have test coverage. When I try running your previous version of the 
code:
   ```
   boolean noNeedRescale =
   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
   
.map(JobEdge::getDownstreamSubtaskStateMapper)
   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
   && 
stateAssignment.executionJobVertex.getInputs().stream()
   .map(IntermediateResult::getProducer)
   .map(vertexAssignments::get)
   .anyMatch(
   taskStateAssignment -> {
   final int oldParallelism =
   stateAssignment
   .oldState
   
.get(stateAssignment.inputOperatorID)
   
.getParallelism();
   return oldParallelism
   == 
taskStateAssignment.executionJobVertex
   
.getParallelism();
   });
   
   if (inputState.getParallelism() == 
executionJobVertex.getParallelism() && !noNeedRescale) {
   stateAssignment.inputChannelStates.putAll(
   toInstanceMap(stateAssignment.inputOperatorID, 
inputOperatorState));
   return;
   }
   ```
   
   the tests in `StateAssignmentOperationTest` are still green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-19 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606228058


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   An important point for test repeatability is to split the record into two 
parts (one at the sink input, one at the source output). That's why we use a 
string with a sufficiently long length and all bytes set to 1 to be sure to 
read data leading to exception in case of splitting. 
   That's why need to realize 3rd option. But maybe we can don't change Long to 
String, but just to add StringSource exactly for CustomPartitioner case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-19 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606228058


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   An important point for test repeatability is to split the record into two 
parts (one at the sink input, one at the source output). That's why we use a 
string with a sufficiently long length and all bytes set to 1 to be sure to 
read data leading to exception in case of splitting. 
   That's why need to realize 3rd option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-19 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606219365


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##
@@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo 
channelInfo) {
 channelInfo.getGateIdx(), this::createPartitioner);
 // use a copy of partitioner to ensure that the filter of 
ambiguous virtual channels
 // have the same state across several subtasks
-return new RecordFilter<>(partitioner.copy(), inputSerializer, 
subtaskIndex);
+StreamPartitioner partitionerCopy = partitioner.copy();
+partitionerCopy.setup(numberOfChannels);
+return new RecordFilter<>(partitionerCopy, inputSerializer, 
subtaskIndex);

Review Comment:
   Extracted to `[hotfix]` [PR](https://github.com/apache/flink/pull/24811)
   Can you also review this one too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-19 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606219365


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##
@@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo 
channelInfo) {
 channelInfo.getGateIdx(), this::createPartitioner);
 // use a copy of partitioner to ensure that the filter of 
ambiguous virtual channels
 // have the same state across several subtasks
-return new RecordFilter<>(partitioner.copy(), inputSerializer, 
subtaskIndex);
+StreamPartitioner partitionerCopy = partitioner.copy();
+partitionerCopy.setup(numberOfChannels);
+return new RecordFilter<>(partitionerCopy, inputSerializer, 
subtaskIndex);

Review Comment:
   Extracted to `[hotfix]` [PR](https://github.com/apache/flink/pull/24811)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-17 Thread via GitHub


pnowojski commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2117031802

   Also please note that azure is failing on some compilation issue. Probably 
some check style violation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-17 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1604531639


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##
@@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo 
channelInfo) {
 channelInfo.getGateIdx(), this::createPartitioner);
 // use a copy of partitioner to ensure that the filter of 
ambiguous virtual channels
 // have the same state across several subtasks
-return new RecordFilter<>(partitioner.copy(), inputSerializer, 
subtaskIndex);
+StreamPartitioner partitionerCopy = partitioner.copy();
+partitionerCopy.setup(numberOfChannels);
+return new RecordFilter<>(partitionerCopy, inputSerializer, 
subtaskIndex);

Review Comment:
   Ok, in that case: is this a production bug (sounds like it), or only issue 
in testing?
   
   1. If this is a production bug, please:
 - create a separate jira ticket for this issue
 - extract this change to a separate commit (can be part of this PR or a 
new PR, as you prefer) 
 - add a test coverage?
   2. If that's just testing issue:
 - extract this change to a separate `[hotfix]` commit in this PR



##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   >  But for the regular repeatability of the test it was necessary to use the 
String.
   
   There are plenty of solutions to this problem:
   
   1. You are encoding in your `String` record three fields: `partition`, 
`index` and some `payload`. But I don't see you using anything but the 
`partition`, so you could convert your whole `String` record into `Long` record 
with value just for the `partition`.
   2. You can encode any number of fields in single `Long` just as well. The 
easiest would be sth like that:
   ```
   long encode(int partition, int index, int payload) {
 checkArgument(partition < 1024);
 checkArgument(index < 1024);
 checkArgument(payload < Long.MAX_VALUE / (1024*1024));
 return partition + index * 1024 + payload * 1024 * 1024;
   }
   ``` 
   3. You could always most likely change the record type for 
`UnalignedCheckpointRescaleITCase`. `String` there should work just as fine, 
but that's probably more work vs option 1.
   
   > but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full 
graph (change parallelism for all vertexes), but need to change only one vertex
   
   When creating job graph in `Topology`  
(`UnalignedCheckpointRescaleITCase.Topology#create`), you can set parallelism 
per vertex and AFAIK that will ov

Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as 
stream data. But for the regular repeatability of the test it was necessary to 
use the `String`. I will try to add test to `UnalignedCheckpointRescaleITCase`, 
but I'm not sure. Also in `UnalignedCheckpointRescaleITCase` rescale full graph 
(change parallelism for all vertexes), but need to change only one vertex, not 
all. I have no ideas, how to add new Topology for this test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as 
stream data. But for the regular repeatability of the test it was necessary to 
use the `String`. I will try to add test to `UnalignedCheckpointRescaleITCase`, 
but I'm not sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603118510


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   I though about it, but in `UnalignedCheckpointRescaleITCase` used `Long` as 
stream data. But for the regular repeatability of the test it was necessary to 
use the `String`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603115904


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {
+
+@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+private static final File CHECKPOINT_FILE = new 
File("src/test/resources/custom-checkpoint");

Review Comment:
   Ok. I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603115064


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##
@@ -232,7 +232,9 @@ public Predicate> apply(InputChannelInfo 
channelInfo) {
 channelInfo.getGateIdx(), this::createPartitioner);
 // use a copy of partitioner to ensure that the filter of 
ambiguous virtual channels
 // have the same state across several subtasks
-return new RecordFilter<>(partitioner.copy(), inputSerializer, 
subtaskIndex);
+StreamPartitioner partitionerCopy = partitioner.copy();
+partitionerCopy.setup(numberOfChannels);
+return new RecordFilter<>(partitionerCopy, inputSerializer, 
subtaskIndex);

Review Comment:
   By testing reported bug, I found little problem with copying partitioner in 
rescaling case. It can be throws ArithmeticException /by zero for 
CustomPartitioners (there used something like ` ... % numberOfChannels`), 
because after copying we have numberOfChannels is 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1603110590


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   The mistake was semantic. It was checked that all the inputs were of FULL 
type, and it is necessary to have at least one. But since the type of 
partitioner seems to be the same for all inputs, I don't see how to add a test 
to check it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1602844798


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   nit: `UnalignedCheckpointRescaleWithCustomPartitionITCase`?



##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {
+
+@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+pr

Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-16 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1602832692


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the explanation.
   
   > But I made little mistake in condition. I will fix it
   
   Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-15 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2111794984

   > ## CI report:
   > * 
[309cf5b](https://github.com/apache/flink/commit/309cf5b7c295a56527b71ac0a1869ce06cd3a244)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=59567)
   > 
   > Bot commands
   
   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Separation is a good idea, ok.
   The first part is to check if SubtaskStateMapper.FULL is used in any input 
(this mapper type always returns all I/O), the problem is reproduced only for 
this mapper type
   The second part is to check if the parallelism of any previous operator has 
changed. if it has changed, it means that the number of outputs has changed.
   But I made little mistake in condition. I will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1600910971


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Separation is a good idea, ok.
   The first part is to check if SubtaskStateMapper.FULL is used in any input 
(this mapper type always returns all I/O), the problem is reproduced only for 
this mapper type
   The second part is to check if the parallelism of any previous operator has 
changed. if it has changed, it means that the number of outputs has changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1599796750


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Could you explain the logic behind this condition? Maybe both of it's parts 
separately?
   ```
   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
   
.map(JobEdge::getDownstreamSubtaskStateMapper)
   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
   ```
   and 
  
   ```
stateAssignment.executionJobVertex.getInputs().stream()
   .map(IntermediateResult::getProducer)
   .map(vertexAssignments::get)
   .anyMatch(
   taskStateAssignment -> {
   final int oldParallelism =
   stateAssignment
   .oldState
   
.get(stateAssignment.inputOperatorID)
   
.getParallelism();
   return oldParallelism
   == 
taskStateAssignment.executionJobVertex
   
.getParallelism();
   }
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


flinkbot commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2109757235

   
   ## CI report:
   
   * 487dc362ac9f6d0b4b32f7545d4a1d7329de1342 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-14 Thread via GitHub


ldadima opened a new pull request, #24784:
URL: https://github.com/apache/flink/pull/24784

   ## What is the purpose of the change
   
   To fix [FLINK-35351](https://issues.apache.org/jira/browse/FLINK-35351)
   
   ## Verifying this change
   
   UnalignedCheckpointCustomRescaleITCase#restoreFromCheckpoint
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org