rkhachatryan commented on code in PR #23591: URL: https://github.com/apache/flink/pull/23591#discussion_r1373904163
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java: ########## @@ -138,22 +108,43 @@ public void discardState() throws Exception { @Override public long getStateSize() { - return super.getStateSize() + metaDataState.getStateSize(); + return directoryStateHandle.getStateSize() + metaStateHandle.getStateSize(); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + getMetaDataState().hashCode(); + int result = directoryStateHandle.hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + result = 31 * result + getMetaDataStateHandle().hashCode(); return result; } @Override public String toString() { return "IncrementalLocalKeyedStateHandle{" + "metaDataState=" - + metaDataState + + metaStateHandle + "} " - + super.toString(); + + "DirectoryKeyedStateHandle{" + + "directoryStateHandle=" + + directoryStateHandle + + ", keyGroupRange=" + + keyGroupRange + + '}'; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { + // Nothing to do, this is for local use only. Review Comment: nit: throw an exception? ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java: ########## @@ -51,6 +55,108 @@ class PrioritizedOperatorSubtaskStateTest { private static final Random RANDOM = new Random(0x42); + @Test + void testTryCreateMixedLocalAndRemoteAlternative() { + testTryCreateMixedLocalAndRemoteAlternative( + StateHandleDummyUtil::createKeyedStateHandleFromSeed, + KeyedStateHandle::getKeyGroupRange); + } + + <SH extends StateObject, ID> void testTryCreateMixedLocalAndRemoteAlternative( + IntFunction<SH> stateHandleFactory, Function<SH, ID> idExtractor) { + List<SH> jmState = + Arrays.asList( + stateHandleFactory.apply(0), + stateHandleFactory.apply(1), + stateHandleFactory.apply(2), + stateHandleFactory.apply(3)); + + List<SH> alternativeA = + Arrays.asList(stateHandleFactory.apply(0), stateHandleFactory.apply(3)); + + List<SH> alternativeB = + Arrays.asList( + stateHandleFactory.apply(1), + stateHandleFactory.apply(3), + stateHandleFactory.apply(5)); + + List<StateObjectCollection<SH>> alternatives = + Arrays.asList( + new StateObjectCollection<>(alternativeA), + new StateObjectCollection<>(Collections.emptyList()), + new StateObjectCollection<>(alternativeB)); + + StateObjectCollection<SH> result = + PrioritizedOperatorSubtaskState.Builder.tryComputeMixedLocalAndRemoteAlternative( + new StateObjectCollection<>(jmState), alternatives, idExtractor) + .get(); + + Assertions.assertEquals(4, result.size()); + List<SH> expected = new ArrayList<>(alternativeA); + expected.add(alternativeB.get(0)); + expected.add(jmState.get(2)); + Assertions.assertTrue(result.containsAll(expected)); Review Comment: I find this test more difficult to follow than it could be. How about: 1. Use `org.assertj.core.api.Assertions.assertThat(result).hasSameElementsAs(...)` 2. Use constants for handles: ``` SH remote0 = stateHandleFactory.apply(0); SH remote1 = stateHandleFactory.apply(1); ... List<SH> jmState = Arrays.asList( remote0, remote1 , remote2 , remote3 ); SH local0 = stateHandleFactory.apply(0); SH local3 = stateHandleFactory.apply(3); ... List<SH> alternativeA = Arrays.asList(...); List<SH> alternativeB = Arrays.asList(...); ... assertThat(result).hasSameElementsAs(Arrays.asList(local0, local3, local1, remote0)); ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java: ########## @@ -313,22 +317,121 @@ public PrioritizedOperatorSubtaskState build() { restoredCheckpointId); } + /** + * This method creates an alternative recovery option by replacing as much job manager state + * with higher prioritized (=local) alternatives as possible. + * + * @param jobManagerState the state that the task got assigned from the job manager (this + * state lives in remote storage). + * @param alternativesByPriority local alternatives to the job manager starte, ordered by Review Comment: typo "starte" ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java: ########## @@ -138,22 +108,43 @@ public void discardState() throws Exception { @Override public long getStateSize() { - return super.getStateSize() + metaDataState.getStateSize(); + return directoryStateHandle.getStateSize() + metaStateHandle.getStateSize(); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + getMetaDataState().hashCode(); + int result = directoryStateHandle.hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + result = 31 * result + getMetaDataStateHandle().hashCode(); Review Comment: 1. Doesn't `equals` need to be updated as well? I think it's currently doesn't match `hashCode` 2. nit: move closer to `equals`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java: ########## @@ -313,22 +317,121 @@ public PrioritizedOperatorSubtaskState build() { restoredCheckpointId); } + /** + * This method creates an alternative recovery option by replacing as much job manager state + * with higher prioritized (=local) alternatives as possible. + * + * @param jobManagerState the state that the task got assigned from the job manager (this + * state lives in remote storage). + * @param alternativesByPriority local alternatives to the job manager starte, ordered by + * priority. + * @param identityExtractor function to extract an identifier from a state object. + * @return prioritized state alternatives. + * @param <STATE_OBJ_TYPE> the type of the state objects we process. + * @param <ID_TYPE> the type of object that represents the id the state object type. + */ + <STATE_OBJ_TYPE extends StateObject, ID_TYPE> + List<StateObjectCollection<STATE_OBJ_TYPE>> computePrioritizedAlternatives( + StateObjectCollection<STATE_OBJ_TYPE> jobManagerState, + List<StateObjectCollection<STATE_OBJ_TYPE>> alternativesByPriority, + Function<STATE_OBJ_TYPE, ID_TYPE> identityExtractor) { + + if (alternativesByPriority != null + && !alternativesByPriority.isEmpty() + && jobManagerState.hasState()) { + + Optional<StateObjectCollection<STATE_OBJ_TYPE>> mergedAlternative = + tryComputeMixedLocalAndRemoteAlternative( + jobManagerState, alternativesByPriority, identityExtractor); + + // Return the mix of local/remote state as first and pure remote state as second + // alternative (in case that we fail to recover from the local state, e.g. because + // of corruption). + if (mergedAlternative.isPresent()) { + return Arrays.asList(mergedAlternative.get(), jobManagerState); + } + } + + return Collections.singletonList(jobManagerState); + } + + /** + * This method creates an alternative recovery option by replacing as much job manager state + * with higher prioritized (=local) alternatives as possible. Returns empty Optional if the + * JM state is empty or nothing could be replaced. + * + * @param jobManagerState the state that the task got assigned from the job manager (this + * state lives in remote storage). + * @param alternativesByPriority local alternatives to the job manager starte, ordered by Review Comment: typo "starte" ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java: ########## @@ -51,6 +55,108 @@ class PrioritizedOperatorSubtaskStateTest { private static final Random RANDOM = new Random(0x42); + @Test + void testTryCreateMixedLocalAndRemoteAlternative() { + testTryCreateMixedLocalAndRemoteAlternative( + StateHandleDummyUtil::createKeyedStateHandleFromSeed, + KeyedStateHandle::getKeyGroupRange); + } + + <SH extends StateObject, ID> void testTryCreateMixedLocalAndRemoteAlternative( + IntFunction<SH> stateHandleFactory, Function<SH, ID> idExtractor) { + List<SH> jmState = + Arrays.asList( + stateHandleFactory.apply(0), + stateHandleFactory.apply(1), + stateHandleFactory.apply(2), + stateHandleFactory.apply(3)); + + List<SH> alternativeA = + Arrays.asList(stateHandleFactory.apply(0), stateHandleFactory.apply(3)); + + List<SH> alternativeB = + Arrays.asList( + stateHandleFactory.apply(1), + stateHandleFactory.apply(3), + stateHandleFactory.apply(5)); + + List<StateObjectCollection<SH>> alternatives = + Arrays.asList( + new StateObjectCollection<>(alternativeA), + new StateObjectCollection<>(Collections.emptyList()), + new StateObjectCollection<>(alternativeB)); + + StateObjectCollection<SH> result = + PrioritizedOperatorSubtaskState.Builder.tryComputeMixedLocalAndRemoteAlternative( Review Comment: It would be more reliable to test public methods `PrioritizedOperatorSubtaskState` (in addition or instead of testing package-private ones). Or would it be too complicated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org