rkhachatryan commented on code in PR #23591:
URL: https://github.com/apache/flink/pull/23591#discussion_r1373904163


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java:
##########
@@ -138,22 +108,43 @@ public void discardState() throws Exception {
 
     @Override
     public long getStateSize() {
-        return super.getStateSize() + metaDataState.getStateSize();
+        return directoryStateHandle.getStateSize() + 
metaStateHandle.getStateSize();
     }
 
     @Override
     public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + getMetaDataState().hashCode();
+        int result = directoryStateHandle.hashCode();
+        result = 31 * result + getKeyGroupRange().hashCode();
+        result = 31 * result + getMetaDataStateHandle().hashCode();
         return result;
     }
 
     @Override
     public String toString() {
         return "IncrementalLocalKeyedStateHandle{"
                 + "metaDataState="
-                + metaDataState
+                + metaStateHandle
                 + "} "
-                + super.toString();
+                + "DirectoryKeyedStateHandle{"
+                + "directoryStateHandle="
+                + directoryStateHandle
+                + ", keyGroupRange="
+                + keyGroupRange
+                + '}';
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+        // Nothing to do, this is for local use only.

Review Comment:
   nit: throw an exception?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java:
##########
@@ -51,6 +55,108 @@ class PrioritizedOperatorSubtaskStateTest {
 
     private static final Random RANDOM = new Random(0x42);
 
+    @Test
+    void testTryCreateMixedLocalAndRemoteAlternative() {
+        testTryCreateMixedLocalAndRemoteAlternative(
+                StateHandleDummyUtil::createKeyedStateHandleFromSeed,
+                KeyedStateHandle::getKeyGroupRange);
+    }
+
+    <SH extends StateObject, ID> void 
testTryCreateMixedLocalAndRemoteAlternative(
+            IntFunction<SH> stateHandleFactory, Function<SH, ID> idExtractor) {
+        List<SH> jmState =
+                Arrays.asList(
+                        stateHandleFactory.apply(0),
+                        stateHandleFactory.apply(1),
+                        stateHandleFactory.apply(2),
+                        stateHandleFactory.apply(3));
+
+        List<SH> alternativeA =
+                Arrays.asList(stateHandleFactory.apply(0), 
stateHandleFactory.apply(3));
+
+        List<SH> alternativeB =
+                Arrays.asList(
+                        stateHandleFactory.apply(1),
+                        stateHandleFactory.apply(3),
+                        stateHandleFactory.apply(5));
+
+        List<StateObjectCollection<SH>> alternatives =
+                Arrays.asList(
+                        new StateObjectCollection<>(alternativeA),
+                        new StateObjectCollection<>(Collections.emptyList()),
+                        new StateObjectCollection<>(alternativeB));
+
+        StateObjectCollection<SH> result =
+                
PrioritizedOperatorSubtaskState.Builder.tryComputeMixedLocalAndRemoteAlternative(
+                                new StateObjectCollection<>(jmState), 
alternatives, idExtractor)
+                        .get();
+
+        Assertions.assertEquals(4, result.size());
+        List<SH> expected = new ArrayList<>(alternativeA);
+        expected.add(alternativeB.get(0));
+        expected.add(jmState.get(2));
+        Assertions.assertTrue(result.containsAll(expected));

Review Comment:
   I find this test more difficult to follow than it could be.
   How about:
   1. Use 
`org.assertj.core.api.Assertions.assertThat(result).hasSameElementsAs(...)`
   2. Use constants for handles:
   ```
   SH remote0 = stateHandleFactory.apply(0);
   SH remote1 = stateHandleFactory.apply(1);
   ...
   List<SH> jmState = Arrays.asList( remote0, remote1 , remote2 , remote3 );
   
   SH local0 = stateHandleFactory.apply(0);
   SH local3 = stateHandleFactory.apply(3);
   
   ...
   List<SH> alternativeA = Arrays.asList(...);
   List<SH> alternativeB = Arrays.asList(...);
   ...
   assertThat(result).hasSameElementsAs(Arrays.asList(local0, local3, local1, 
remote0));
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java:
##########
@@ -313,22 +317,121 @@ public PrioritizedOperatorSubtaskState build() {
                     restoredCheckpointId);
         }
 
+        /**
+         * This method creates an alternative recovery option by replacing as 
much job manager state
+         * with higher prioritized (=local) alternatives as possible.
+         *
+         * @param jobManagerState the state that the task got assigned from 
the job manager (this
+         *     state lives in remote storage).
+         * @param alternativesByPriority local alternatives to the job manager 
starte, ordered by

Review Comment:
   typo "starte"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java:
##########
@@ -138,22 +108,43 @@ public void discardState() throws Exception {
 
     @Override
     public long getStateSize() {
-        return super.getStateSize() + metaDataState.getStateSize();
+        return directoryStateHandle.getStateSize() + 
metaStateHandle.getStateSize();
     }
 
     @Override
     public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + getMetaDataState().hashCode();
+        int result = directoryStateHandle.hashCode();
+        result = 31 * result + getKeyGroupRange().hashCode();
+        result = 31 * result + getMetaDataStateHandle().hashCode();

Review Comment:
   1. Doesn't `equals` need to be updated as well? I think it's currently 
doesn't match `hashCode`
   2. nit: move closer to `equals`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java:
##########
@@ -313,22 +317,121 @@ public PrioritizedOperatorSubtaskState build() {
                     restoredCheckpointId);
         }
 
+        /**
+         * This method creates an alternative recovery option by replacing as 
much job manager state
+         * with higher prioritized (=local) alternatives as possible.
+         *
+         * @param jobManagerState the state that the task got assigned from 
the job manager (this
+         *     state lives in remote storage).
+         * @param alternativesByPriority local alternatives to the job manager 
starte, ordered by
+         *     priority.
+         * @param identityExtractor function to extract an identifier from a 
state object.
+         * @return prioritized state alternatives.
+         * @param <STATE_OBJ_TYPE> the type of the state objects we process.
+         * @param <ID_TYPE> the type of object that represents the id the 
state object type.
+         */
+        <STATE_OBJ_TYPE extends StateObject, ID_TYPE>
+                List<StateObjectCollection<STATE_OBJ_TYPE>> 
computePrioritizedAlternatives(
+                        StateObjectCollection<STATE_OBJ_TYPE> jobManagerState,
+                        List<StateObjectCollection<STATE_OBJ_TYPE>> 
alternativesByPriority,
+                        Function<STATE_OBJ_TYPE, ID_TYPE> identityExtractor) {
+
+            if (alternativesByPriority != null
+                    && !alternativesByPriority.isEmpty()
+                    && jobManagerState.hasState()) {
+
+                Optional<StateObjectCollection<STATE_OBJ_TYPE>> 
mergedAlternative =
+                        tryComputeMixedLocalAndRemoteAlternative(
+                                jobManagerState, alternativesByPriority, 
identityExtractor);
+
+                // Return the mix of local/remote state as first and pure 
remote state as second
+                // alternative (in case that we fail to recover from the local 
state, e.g. because
+                // of corruption).
+                if (mergedAlternative.isPresent()) {
+                    return Arrays.asList(mergedAlternative.get(), 
jobManagerState);
+                }
+            }
+
+            return Collections.singletonList(jobManagerState);
+        }
+
+        /**
+         * This method creates an alternative recovery option by replacing as 
much job manager state
+         * with higher prioritized (=local) alternatives as possible. Returns 
empty Optional if the
+         * JM state is empty or nothing could be replaced.
+         *
+         * @param jobManagerState the state that the task got assigned from 
the job manager (this
+         *     state lives in remote storage).
+         * @param alternativesByPriority local alternatives to the job manager 
starte, ordered by

Review Comment:
   typo "starte"



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java:
##########
@@ -51,6 +55,108 @@ class PrioritizedOperatorSubtaskStateTest {
 
     private static final Random RANDOM = new Random(0x42);
 
+    @Test
+    void testTryCreateMixedLocalAndRemoteAlternative() {
+        testTryCreateMixedLocalAndRemoteAlternative(
+                StateHandleDummyUtil::createKeyedStateHandleFromSeed,
+                KeyedStateHandle::getKeyGroupRange);
+    }
+
+    <SH extends StateObject, ID> void 
testTryCreateMixedLocalAndRemoteAlternative(
+            IntFunction<SH> stateHandleFactory, Function<SH, ID> idExtractor) {
+        List<SH> jmState =
+                Arrays.asList(
+                        stateHandleFactory.apply(0),
+                        stateHandleFactory.apply(1),
+                        stateHandleFactory.apply(2),
+                        stateHandleFactory.apply(3));
+
+        List<SH> alternativeA =
+                Arrays.asList(stateHandleFactory.apply(0), 
stateHandleFactory.apply(3));
+
+        List<SH> alternativeB =
+                Arrays.asList(
+                        stateHandleFactory.apply(1),
+                        stateHandleFactory.apply(3),
+                        stateHandleFactory.apply(5));
+
+        List<StateObjectCollection<SH>> alternatives =
+                Arrays.asList(
+                        new StateObjectCollection<>(alternativeA),
+                        new StateObjectCollection<>(Collections.emptyList()),
+                        new StateObjectCollection<>(alternativeB));
+
+        StateObjectCollection<SH> result =
+                
PrioritizedOperatorSubtaskState.Builder.tryComputeMixedLocalAndRemoteAlternative(

Review Comment:
   It would be more reliable to test public methods 
`PrioritizedOperatorSubtaskState` (in addition or instead of testing 
package-private ones).
   Or would it be too complicated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to