dmvk commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r814734530



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -423,38 +512,45 @@ public boolean releaseAndTryRemove(String key) throws 
Exception {
      */
     @Override
     public void releaseAndTryRemoveAll() throws Exception {
+        final List<String> validKeys = new ArrayList<>();
         final List<RetrievableStateHandle<T>> validStateHandles = new 
ArrayList<>();
-        kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
-                        c -> {
-                            if (isValidOperation(c)) {
-                                final Map<String, String> updateData = new 
HashMap<>(c.getData());
-                                c.getData().entrySet().stream()
-                                        .filter(entry -> 
configMapKeyFilter.test(entry.getKey()))
-                                        .forEach(
-                                                entry -> {
-                                                    try {
-                                                        validStateHandles.add(
-                                                                
deserializeObject(
-                                                                        
entry.getValue()));
-                                                        
updateData.remove(entry.getKey());
-                                                    } catch (IOException e) {
-                                                        LOG.warn(
-                                                                "ConfigMap {} 
contained corrupted data. Ignoring the key {}.",
-                                                                configMapName,
-                                                                
entry.getKey());
-                                                    }
-                                                });
-                                c.getData().clear();
-                                c.getData().putAll(updateData);
-                                return Optional.of(c);
-                            }
-                            return Optional.empty();
+        updateConfigMap(
+                        configMap -> {
+                            final Map<String, String> updateData =
+                                    new HashMap<>(configMap.getData());
+                            configMap.getData().entrySet().stream()
+                                    .filter(entry -> 
configMapKeyFilter.test(entry.getKey()))
+                                    .forEach(
+                                            entry -> {
+                                                try {
+                                                    final 
StateHandleWithDeleteMarker<T> result =
+                                                            
deserializeStateHandle(
+                                                                    
entry.getValue());
+                                                    
validKeys.add(entry.getKey());
+                                                    
validStateHandles.add(result.getInner());
+                                                    updateData.put(
+                                                            entry.getKey(),
+                                                            encodeStateHandle(
+                                                                    
InstantiationUtil
+                                                                            
.serializeObject(
+                                                                               
     result
+                                                                               
             .toDeleting())));
+                                                } catch (IOException e) {
+                                                    LOG.warn(
+                                                            "ConfigMap {} 
contained corrupted data. Ignoring the key {}.",

Review comment:
       That's done as a side effect of not adding  the key into the 
"updateDate" map, I'll try to make it more explicit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to