pnowojski commented on a change in pull request #14953:
URL: https://github.com/apache/flink/pull/14953#discussion_r579086417



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
##########
@@ -89,37 +82,6 @@ public void testSuspendDiscardsCheckpoints() throws 
Exception {
         verifyCheckpointDiscarded(taskStates);
     }
 
-    /**
-     * Tests that the checkpoint does not exist in the store when we fail to 
add it into the store
-     * (i.e., there exists an exception thrown by the method).
-     */
-    @Test
-    public void testAddCheckpointWithFailedRemove() throws Exception {
-
-        final int numCheckpointsToRetain = 1;
-        CompletedCheckpointStore store =
-                createCompletedCheckpoints(numCheckpointsToRetain, 
Executors.directExecutor());
-
-        for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-            CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
-            doReturn(i).when(checkpointToAdd).getCheckpointID();
-            
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-            doThrow(new 
IOException()).when(checkpointToAdd).discardOnSubsume();
-
-            try {
-                store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), 
() -> {});
-
-                // The checkpoint should be in the store if we successfully 
add it into the store.
-                List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
-                assertTrue(addedCheckpoints.contains(checkpointToAdd));
-            } catch (Exception e) {
-                // The checkpoint should not be in the store if any exception 
is thrown.
-                List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
-                assertFalse(addedCheckpoints.contains(checkpointToAdd));
-            }
-        }
-    }
-

Review comment:
       Those tests were added in FLINK-6027:
   https://github.com/apache/flink/pull/3521/files
   
   I think the intention was to guard against a potential future regression?
   
   Also looking at the test code, it looks mostly fine (% mockito). I don't get 
the `catch (Exception e)` part, it looks like a dead code that's not testing 
anything, but the main happy path is actually I think a valid test.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -230,15 +231,15 @@ public void addCheckpoint(
 
         completedCheckpoints.addLast(checkpoint);
 
-        // Everything worked, let's remove a previous checkpoint if necessary.
-        while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-            final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
-            tryRemoveCompletedCheckpoint(
-                    completedCheckpoint,
-                    completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                    checkpointsCleaner,
-                    postCleanup);
-        }
+        subsume(

Review comment:
       I wouldn't static import such things. Only the very most 
common/important or otherwise self explanatory static imports are helpful IMO. 
Here in this case I was quite confused where is this method `subsume` coming 
from, and only later I noticed it's actually a static import.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -34,14 +38,46 @@ public static void subsume(
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
             return;
         }
+        CompletedCheckpoint last = checkpoints.peekLast();
+        CompletedCheckpoint lastNotSavepoint = 
getLastNotSavepoint(checkpoints);

Review comment:
       could you maybe rename `last` to `latest`? It took me quite a bit to 
wrap my head around what's the first and what's last.
   
   For example the javadoc below:
   ```
   // Don't remove the last non-savepoint lest invalidate future incremental 
snapshots
   ```
   would be easier to understand as:
   ```
   // Don't remove the latest non-savepoint lest invalidate future incremental 
snapshots
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -34,14 +38,46 @@ public static void subsume(
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
             return;
         }
+        CompletedCheckpoint last = checkpoints.peekLast();
+        CompletedCheckpoint lastNotSavepoint = 
getLastNotSavepoint(checkpoints);
+        Iterator<CompletedCheckpoint> iterator = checkpoints.iterator();
+        while (checkpoints.size() > numRetain && iterator.hasNext()) {
+            CompletedCheckpoint next = iterator.next();
+            if (canSubsume(next, last, lastNotSavepoint)) {
+                iterator.remove();
+                try {
+                    subsumeAction.accept(next);
+                } catch (Exception e) {
+                    LOG.warn("Fail to subsume the old checkpoint.", e);
+                }
+            }
+            // Don't break out from the loop to subsume intermediate savepoints
+        }
+    }
 
-        while (checkpoints.size() > numRetain) {
-            CompletedCheckpoint completedCheckpoint = 
checkpoints.removeFirst();
-            try {
-                subsumeAction.accept(completedCheckpoint);
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
+    @Nullable
+    private static CompletedCheckpoint 
getLastNotSavepoint(Deque<CompletedCheckpoint> completed) {
+        Iterator<CompletedCheckpoint> descendingIterator = 
completed.descendingIterator();
+        while (descendingIterator.hasNext()) {
+            CompletedCheckpoint next = descendingIterator.next();
+            if (!next.getProperties().isSavepoint()) {
+                return next;
             }
         }
+        return null;

Review comment:
       return `Optional`. You have already forgotten to pass `@Nullable` 
annotation to the `canSubsume` and to the  `lastNotSavepoint` local variable.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+class CheckpointSubsumeHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+    public static void subsume(
+            Deque<CompletedCheckpoint> checkpoints,
+            int numRetain,
+            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)

Review comment:
       I think you are using a bit too much those kind of function pointers 
that are hurting readability. To understand what is happening in this piece of 
code one needs to jump back and forth between many places (this method and all 
of it's call sites).
   
   At the very least I would introduce a dedicated named interface for the 
`subsumeAction`, so that there is one less hop when someone wants to find this 
subsume action (find implementations would work). But that's still not perfect.
   
   But why not change the contract and return a collection/stream of 
checkpoints to subsume? That would make this helper method simpler, easier to 
test and avoid the whole problem.
   
   If you want, you could still provide two wrappers around the 
`Collection<Checkpoint> extractCheckpointsToSubsume()` (one for the 
`tryRemoveCompletedCheckpoint` and the other for 
`CompletedCheckpoint::discardOnSubsume`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+class CheckpointSubsumeHelper {

Review comment:
       Some java doc with a couple words of explanation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to