XComp commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r813145701



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -132,31 +197,27 @@ public KubernetesStateHandleStore(
 
         final RetrievableStateHandle<T> storeHandle = storage.store(state);
 
-        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
+        final byte[] serializedStoreHandle =
+                serializeOrDiscard(new 
StateHandleWithDeleteMarker<>(storeHandle));
 
         // initialize flag to serve the failure case
         boolean discardState = true;
         try {
             // a successful operation will result in the state not being 
discarded
             discardState =
-                    !kubeClient
-                            .checkAndUpdateConfigMap(
-                                    configMapName,
-                                    c -> {
-                                        if (isValidOperation(c)) {
-                                            if (!c.getData().containsKey(key)) 
{
-                                                c.getData()
-                                                        .put(
-                                                                key,
-                                                                
encodeStateHandle(
-                                                                        
serializedStoreHandle));
-                                                return Optional.of(c);
-                                            } else {
-                                                throw new CompletionException(
-                                                        
getKeyAlreadyExistException(key));
-                                            }
+                    !updateConfigMap(
+                                    configMap -> {
+                                        if 
(!configMap.getData().containsKey(key)) {
+                                            configMap
+                                                    .getData()
+                                                    .put(
+                                                            key,
+                                                            encodeStateHandle(
+                                                                    
serializedStoreHandle));
+                                            return Optional.of(configMap);
                                         }
-                                        return Optional.empty();
+                                        throw new CompletionException(

Review comment:
       Shouldn't we overwrite the old value when calling `addAndLock` on a 
marked-for-deletion entry instead of throwing the `AlreadyExistException`?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -36,19 +38,34 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link KubernetesStateHandleStore} operations. */
 public class KubernetesStateHandleStoreTest extends 
KubernetesHighAvailabilityTestBase {
 
     private static final String PREFIX = "test-prefix-";
+
+    private static class FailingStateHandle implements StateObject {
+
+        @Override
+        public void discardState() throws Exception {}
+
+        @Override
+        public long getStateSize() {
+            return 0;
+        }
+    }
+

Review comment:
       unused

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws 
Exception {
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = 
new AtomicReference<>();
-
-        return kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
+        return updateConfigMap(
                         configMap -> {
-                            if (isValidOperation(configMap)) {
-                                final String content = 
configMap.getData().remove(key);
-                                if (content != null) {
-                                    try {
-                                        
stateHandleRefer.set(deserializeObject(content));
-                                    } catch (IOException e) {
-                                        LOG.warn(
-                                                "Could not retrieve the state 
handle of {} from ConfigMap {}.",
-                                                key,
-                                                configMapName,
-                                                e);
+                            final String content = 
configMap.getData().get(key);
+                            if (content != null) {
+                                try {
+                                    final StateHandleWithDeleteMarker<T> 
result =
+                                            deserializeStateHandle(content);
+                                    if (!result.isMarkedForDeletion()) {
+                                        configMap
+                                                .getData()
+                                                .put(
+                                                        key,
+                                                        encodeStateHandle(
+                                                                
InstantiationUtil.serializeObject(
+                                                                        
result.toDeleting())));
                                     }
+                                    stateHandleRefer.set(result.getInner());
+                                } catch (IOException e) {
+                                    LOG.warn(
+                                            "Could not retrieve the state 
handle of {} from ConfigMap {}.",
+                                            key,
+                                            configMapName,
+                                            e);
+                                    // TODO log / comment
+                                    
Objects.requireNonNull(configMap.getData().remove(key));
                                 }
-                                return Optional.of(configMap);
                             }
-                            return Optional.empty();
+                            return Optional.of(configMap);
                         })
-                .whenComplete(
-                        (succeed, ignore) -> {
-                            if (succeed) {
-                                if (stateHandleRefer.get() != null) {
-                                    try {
-                                        stateHandleRefer.get().discardState();
-                                    } catch (Exception e) {
-                                        throw new CompletionException(e);
-                                    }
+                .thenCompose(
+                        updated -> {
+                            // We don't care whether the configmap has been 
updated or not

Review comment:
       nit: this comment can be avoided by just renaming the `updated` 
parameter of this callback into something like `ignoredUpdatedResult`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -423,38 +512,45 @@ public boolean releaseAndTryRemove(String key) throws 
Exception {
      */
     @Override
     public void releaseAndTryRemoveAll() throws Exception {
+        final List<String> validKeys = new ArrayList<>();
         final List<RetrievableStateHandle<T>> validStateHandles = new 
ArrayList<>();
-        kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
-                        c -> {
-                            if (isValidOperation(c)) {
-                                final Map<String, String> updateData = new 
HashMap<>(c.getData());
-                                c.getData().entrySet().stream()
-                                        .filter(entry -> 
configMapKeyFilter.test(entry.getKey()))
-                                        .forEach(
-                                                entry -> {
-                                                    try {
-                                                        validStateHandles.add(
-                                                                
deserializeObject(
-                                                                        
entry.getValue()));
-                                                        
updateData.remove(entry.getKey());
-                                                    } catch (IOException e) {
-                                                        LOG.warn(
-                                                                "ConfigMap {} 
contained corrupted data. Ignoring the key {}.",
-                                                                configMapName,
-                                                                
entry.getKey());
-                                                    }
-                                                });
-                                c.getData().clear();
-                                c.getData().putAll(updateData);
-                                return Optional.of(c);
-                            }
-                            return Optional.empty();
+        updateConfigMap(
+                        configMap -> {
+                            final Map<String, String> updateData =
+                                    new HashMap<>(configMap.getData());
+                            configMap.getData().entrySet().stream()
+                                    .filter(entry -> 
configMapKeyFilter.test(entry.getKey()))
+                                    .forEach(
+                                            entry -> {
+                                                try {
+                                                    final 
StateHandleWithDeleteMarker<T> result =
+                                                            
deserializeStateHandle(
+                                                                    
entry.getValue());
+                                                    
validKeys.add(entry.getKey());
+                                                    
validStateHandles.add(result.getInner());
+                                                    updateData.put(
+                                                            entry.getKey(),
+                                                            encodeStateHandle(
+                                                                    
InstantiationUtil
+                                                                            
.serializeObject(
+                                                                               
     result
+                                                                               
             .toDeleting())));
+                                                } catch (IOException e) {
+                                                    LOG.warn(
+                                                            "ConfigMap {} 
contained corrupted data. Ignoring the key {}.",

Review comment:
       I guess, we could clean up the entry here as well, couldn't we? ...to be 
consistent with the `releaseAndTryRemove` implementation?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -69,13 +75,72 @@
  * the leader could update the store. Then we will completely get rid of the 
lock-and-release in
  * Zookeeper implementation.
  *
- * @param <T> Type of state
+ * @param <T> Type of the state we're storing.
  */
 public class KubernetesStateHandleStore<T extends Serializable>
         implements StateHandleStore<T, StringResourceVersion> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
 
+    private static <T extends Serializable> StateHandleWithDeleteMarker<T> 
deserializeStateHandle(

Review comment:
       Reading this I was wondering whether we could just add a prefix to the 
Base64 content. This would enable us to filter rightaway without 
serializing/deserializing the data. Or is this "too hacky"? 🤔 

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+                                        final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                        @Override
+                                        public void discardState() {
+                                            if (!thrown.getAndSet(true)) {
+                                                throw discardException;
+                                            }
+                                            super.discardState();
+                                        }
+                                    });
+
+                            assertThat(store.getAllAndLock().size(), is(1));
+                            assertThat(
+                                    store.getAndLock(key),
+                                    
is(notNullValue(RetrievableStateHandle.class)));
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(
+                                            Exception.class,
+                                            () -> {
+                                                store.releaseAndTryRemove(key);
+                                            });
+                            assertThat(exception, 
FlinkMatchers.containsCause(discardException));
+
+                            // Now we should see that the node is 
"soft-deleted". This means it can
+                            // no longer be accessed by the get methods, but 
the underlying state
+                            // still exists.
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+                            // Second retry should succeed and remove the 
underlying state and the
+                            // reference in config map.
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            final int numKeys = 10;
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            for (int idx = 0; idx < numKeys; idx++) {
+                                store.addAndLock(
+                                        key + "_" + idx,
+                                        new 
TestingLongStateHandleHelper.LongStateHandle(idx + 1) {

Review comment:
       FYI: As already mentioned. I extended `LongStateHandle` to cover this 
functionality in PR #18869 

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {

Review comment:
       Shouldn't we at least test the `addAndLock` on a marked-for-deletion 
entry? 🤔 

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws 
Exception {
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = 
new AtomicReference<>();
-
-        return kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
+        return updateConfigMap(
                         configMap -> {
-                            if (isValidOperation(configMap)) {
-                                final String content = 
configMap.getData().remove(key);
-                                if (content != null) {
-                                    try {
-                                        
stateHandleRefer.set(deserializeObject(content));
-                                    } catch (IOException e) {
-                                        LOG.warn(
-                                                "Could not retrieve the state 
handle of {} from ConfigMap {}.",
-                                                key,
-                                                configMapName,
-                                                e);
+                            final String content = 
configMap.getData().get(key);
+                            if (content != null) {
+                                try {
+                                    final StateHandleWithDeleteMarker<T> 
result =
+                                            deserializeStateHandle(content);
+                                    if (!result.isMarkedForDeletion()) {
+                                        configMap
+                                                .getData()
+                                                .put(
+                                                        key,
+                                                        encodeStateHandle(
+                                                                
InstantiationUtil.serializeObject(
+                                                                        
result.toDeleting())));
                                     }
+                                    stateHandleRefer.set(result.getInner());
+                                } catch (IOException e) {
+                                    LOG.warn(
+                                            "Could not retrieve the state 
handle of {} from ConfigMap {}.",

Review comment:
       Shall we add a sentence to the comment that we're going to remove the 
entry?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -208,30 +265,29 @@ public void replace(String key, StringResourceVersion 
resourceVersion, T state)
 
         final RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
-        final byte[] serializedStateHandle = 
serializeOrDiscard(newStateHandle);
+        final byte[] serializedStateHandle =
+                serializeOrDiscard(new 
StateHandleWithDeleteMarker<>(newStateHandle));
 
         // initialize flags to serve the failure case
         boolean discardOldState = false;
         boolean discardNewState = true;
         try {
             boolean success =
-                    kubeClient
-                            .checkAndUpdateConfigMap(
-                                    configMapName,
-                                    c -> {
-                                        if (isValidOperation(c)) {
+                    updateConfigMap(
+                                    configMap -> {
+                                        if (isValidOperation(configMap)) {

Review comment:
       ```suggestion
   ```
   This is already called in `updateConfigMap`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -208,30 +265,29 @@ public void replace(String key, StringResourceVersion 
resourceVersion, T state)
 
         final RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
-        final byte[] serializedStateHandle = 
serializeOrDiscard(newStateHandle);
+        final byte[] serializedStateHandle =
+                serializeOrDiscard(new 
StateHandleWithDeleteMarker<>(newStateHandle));
 
         // initialize flags to serve the failure case
         boolean discardOldState = false;
         boolean discardNewState = true;
         try {
             boolean success =
-                    kubeClient
-                            .checkAndUpdateConfigMap(
-                                    configMapName,
-                                    c -> {
-                                        if (isValidOperation(c)) {
+                    updateConfigMap(
+                                    configMap -> {
+                                        if (isValidOperation(configMap)) {
                                             // Check the existence
-                                            if (c.getData().containsKey(key)) {
-                                                c.getData()
+                                            if 
(configMap.getData().containsKey(key)) {

Review comment:
       Just thinking loud here: This will overwrite a marked-for-deletion 
entry. But that's what we want here, I guess.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+                                        final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                        @Override
+                                        public void discardState() {
+                                            if (!thrown.getAndSet(true)) {
+                                                throw discardException;
+                                            }
+                                            super.discardState();
+                                        }
+                                    });
+
+                            assertThat(store.getAllAndLock().size(), is(1));
+                            assertThat(
+                                    store.getAndLock(key),
+                                    
is(notNullValue(RetrievableStateHandle.class)));
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(
+                                            Exception.class,
+                                            () -> {
+                                                store.releaseAndTryRemove(key);
+                                            });

Review comment:
       ```suggestion
                               final Exception exception =
                                       assertThrows(
                                               Exception.class, () -> 
store.releaseAndTryRemove(key));
   ```
   nit: can be shortened

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -303,7 +359,12 @@ public StringResourceVersion exists(String key) throws 
Exception {
         if (optional.isPresent()) {

Review comment:
       I couldn't annotate the location directly in the Github UI. But we 
should also return `notExisting()` when the key exists but it's already marked 
for deletion

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws 
Exception {
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = 
new AtomicReference<>();
-
-        return kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
+        return updateConfigMap(
                         configMap -> {
-                            if (isValidOperation(configMap)) {
-                                final String content = 
configMap.getData().remove(key);
-                                if (content != null) {
-                                    try {
-                                        
stateHandleRefer.set(deserializeObject(content));
-                                    } catch (IOException e) {
-                                        LOG.warn(
-                                                "Could not retrieve the state 
handle of {} from ConfigMap {}.",
-                                                key,
-                                                configMapName,
-                                                e);
+                            final String content = 
configMap.getData().get(key);
+                            if (content != null) {

Review comment:
       Reverting the condition would remove one level of indentation and might 
improve readability




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to