zentol commented on a change in pull request #19121: URL: https://github.com/apache/flink/pull/19121#discussion_r829068119
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -496,12 +498,14 @@ public boolean releaseAndTryRemove(String key) throws Exception { Objects.requireNonNull(configMap.getData().remove(key)); } return Optional.of(configMap); + } else { + stateHandleDoesNotExist.set(true); } return Optional.empty(); }) .thenCompose( updated -> { - if (updated && stateHandleRefer.get() != null) { + if (stateHandleRefer.get() != null) { Review comment: Why are we now assuming that update == true if the ref isn't null? Even if it's currently the case, that seems to rely on implementation details of the KubeClient. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -464,13 +465,14 @@ public StringResourceVersion exists(String key) throws Exception { * It returns the {@link RetrievableStateHandle} stored under the given state node if any. * * @param key Key to be removed from ConfigMap - * @return True if the state handle is removed successfully + * @return True if the state handle isn't listed anymore. * @throws Exception if removing the key or discarding the state failed */ @Override public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); - final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); + final AtomicReference<StateObject> stateHandleRefer = new AtomicReference<>(); + final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false); Review comment: :/ it's not really nice that we need this secondary channel to get results from the update function. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java ########## @@ -220,14 +221,23 @@ public void testGlobalCleanupWithNonExistName() throws Exception { .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor()) .join(); - try { - removeFuture.get(timeout, TimeUnit.MILLISECONDS); - fail( - "We should get an expected timeout because we are removing a non-existed job graph."); - } catch (TimeoutException ex) { - // expected - } - assertThat(removeFuture.isDone(), is(false)); + removeFuture.join(); + assertThat(removeFuture.isDone(), is(true)); Review comment: there's no way for this to fail. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -803,6 +803,34 @@ public void testRemove() throws Exception { }; } + @Test + public void testRemoveOfNonExistingState() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + assertThat(store.getAllAndLock().size(), is(0)); + assertThat(store.releaseAndTryRemove(key), is(true)); + assertThat(store.getAllAndLock().size(), is(0)); + + // State should also be discarded. Review comment: shouldn't the count then be 1?... ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -803,6 +803,34 @@ public void testRemove() throws Exception { }; } + @Test + public void testRemoveOfNonExistingState() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + assertThat(store.getAllAndLock().size(), is(0)); + assertThat(store.releaseAndTryRemove(key), is(true)); + assertThat(store.getAllAndLock().size(), is(0)); + + // State should also be discarded. Review comment: I'm referring to the count that is asserted directly below. I agree that it should be 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org