zentol commented on a change in pull request #19121:
URL: https://github.com/apache/flink/pull/19121#discussion_r829068119



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -496,12 +498,14 @@ public boolean releaseAndTryRemove(String key) throws 
Exception {
                                     
Objects.requireNonNull(configMap.getData().remove(key));
                                 }
                                 return Optional.of(configMap);
+                            } else {
+                                stateHandleDoesNotExist.set(true);
                             }
                             return Optional.empty();
                         })
                 .thenCompose(
                         updated -> {
-                            if (updated && stateHandleRefer.get() != null) {
+                            if (stateHandleRefer.get() != null) {

Review comment:
       Why are we now assuming that update == true if the ref isn't null? Even 
if it's currently the case, that seems to rely on implementation details of the 
KubeClient.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -464,13 +465,14 @@ public StringResourceVersion exists(String key) throws 
Exception {
      * It returns the {@link RetrievableStateHandle} stored under the given 
state node if any.
      *
      * @param key Key to be removed from ConfigMap
-     * @return True if the state handle is removed successfully
+     * @return True if the state handle isn't listed anymore.
      * @throws Exception if removing the key or discarding the state failed
      */
     @Override
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
-        final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = 
new AtomicReference<>();
+        final AtomicReference<StateObject> stateHandleRefer = new 
AtomicReference<>();
+        final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);

Review comment:
       :/
   
   it's not really nice that we need this secondary channel to get results from 
the update function.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
##########
@@ -220,14 +221,23 @@ public void testGlobalCleanupWithNonExistName() throws 
Exception {
                 .globalCleanupAsync(testingJobGraph.getJobID(), 
Executors.directExecutor())
                 .join();
 
-        try {
-            removeFuture.get(timeout, TimeUnit.MILLISECONDS);
-            fail(
-                    "We should get an expected timeout because we are removing 
a non-existed job graph.");
-        } catch (TimeoutException ex) {
-            // expected
-        }
-        assertThat(removeFuture.isDone(), is(false));
+        removeFuture.join();
+        assertThat(removeFuture.isDone(), is(true));

Review comment:
       there's no way for this to fail.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -803,6 +803,34 @@ public void testRemove() throws Exception {
         };
     }
 
+    @Test
+    public void testRemoveOfNonExistingState() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+
+                            // State should also be discarded.

Review comment:
       shouldn't the count then be 1?...

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -803,6 +803,34 @@ public void testRemove() throws Exception {
         };
     }
 
+    @Test
+    public void testRemoveOfNonExistingState() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+
+                            // State should also be discarded.

Review comment:
       I'm referring to the count that is asserted directly below. I agree that 
it should be 0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to