pnowojski commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r767581589



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,20 @@
 
     /**
      * Stores the materialized sstable files from all snapshots that build the 
incremental history.
+     * Used to check whether {@link PlaceholderStreamStateHandle} can be send 
or the original {@link
+     * StreamStateHandle} must be used.

Review comment:
       Can you explain when should we send the placeholder handles and when the 
original ones? (maybe put it into the java doc?)

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -466,16 +496,11 @@ private void createUploadFilePaths(
                 final StateHandleID stateHandleID = new 
StateHandleID(fileName);
 
                 if (fileName.endsWith(SST_FILE_SUFFIX)) {
-                    final boolean existsAlready =
-                            baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-                    if (existsAlready) {
-                        // we introduce a placeholder state handle, that is 
replaced with the
-                        // original from the shared state registry (created 
from a previous
-                        // checkpoint)
-                        sstFiles.put(stateHandleID, new 
PlaceholderStreamStateHandle());
+                    Optional<StreamStateHandle> uploaded = 
previousSnapshot.get(stateHandleID);
+                    if (uploaded.isPresent()) {
+                        sstFiles.put(stateHandleID, uploaded.get());
                     } else {
-                        sstFilePaths.put(stateHandleID, filePath);
+                        sstFilePaths.put(stateHandleID, filePath); // re-upload
                     }

Review comment:
       Maybe inline here the `previousSnapshot.get()` logic, but extract this 
whole snippet to `handleSstFile(...)` method? I have a feeling that 
`previousSnapshot.get()` and the logic here is too much intertwined with this 
code , and at the same the code is not that long to require splitting into even 
smaller methods.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -561,4 +586,37 @@ public void release() {
             }
         }
     }
+
+    private static class PreviousSnapshot {
+
+        @Nullable private final Set<StateHandleID> confirmedSstFiles;
+        private final Map<StateHandleID, StreamStateHandle> uploadedSstFiles;
+
+        private PreviousSnapshot(
+                @Nullable Set<StateHandleID> confirmedSstFiles,
+                @Nonnull Map<StateHandleID, StreamStateHandle> 
uploadedSstFiles) {
+            this.confirmedSstFiles = confirmedSstFiles;
+            this.uploadedSstFiles = 
Preconditions.checkNotNull(uploadedSstFiles);
+        }
+
+        private Optional<StreamStateHandle> get(StateHandleID stateHandleID) {

Review comment:
       1. maybe rename to `getAlreadyUploadedHandles()`?
   2. add a javadoc explaining this method?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -97,12 +98,20 @@
 
     /**
      * Stores the materialized sstable files from all snapshots that build the 
incremental history.
+     * Used to check whether {@link PlaceholderStreamStateHandle} can be send 
or the original {@link
+     * StreamStateHandle} must be used.
      */
     @Nonnull private final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
 
+    /** Uploaded but not yet confirmed SST files. Used to avoid re-uploading. 
*/
+    @Nonnull private final Map<StateHandleID, StreamStateHandle> 
lastUploadedSstFiles;

Review comment:
       Is the java doc accurate? Shouldn't it be "uploaded but potentially not 
yet confirmed SST files"? It looks like those files could be already confirmed?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -453,6 +470,19 @@ private void uploadSstFiles(
                 miscFiles.putAll(
                         stateUploader.uploadFilesToCheckpointFs(
                                 miscFilePaths, checkpointStreamFactory, 
snapshotCloseableRegistry));
+
+                synchronized (materializedSstFiles) {
+                    // ignore an older upload if it completed after a newer 
one has completed
+                    if (checkpointId > lastCheckpointIdUploadedSst) {
+                        lastCheckpointIdUploadedSst = checkpointId;
+                        lastUploadedSstFiles.clear();
+                        LOG.trace(
+                                "Update lastUploadedSstFiles for checkpoint 
{}: {}",
+                                checkpointId,
+                                sstFiles);
+                        lastUploadedSstFiles.putAll(sstFiles);
+                    }
+                }

Review comment:
       It's a bit confusing that you have this logic here, while 
`materializedSstFiles` are being updated just after `uploadSstFiles()` call. 
Could we merge those two synchronised sections together?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1211,6 +1211,8 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
         final long checkpointId = pendingCheckpoint.getCheckpointId();

Review comment:
       In the last commit message:
   > For example, if a task sent state for the first time and then failed 
without completing checkpoint and restarting JM;
   
   Do you mean JM failure? Or TM/task failure?
   > For example, if a task sent state for the first time and then JM failed 
without completing checkpoint and restarting JM
   
   ?
   
   Can you elaborate what is the scenario? 

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -258,30 +268,37 @@ private SnapshotDirectory 
prepareLocalSnapshotDirectory(long checkpointId) throw
         }

Review comment:
       Shouldn't `RocksIncrementalSnapshotStrategy#notifyCheckpointAborted` be 
changed as well? Aborted files could now be re-used as well, right? They are 
still "materialised".
   
   edit: actually I think the logic _maybe_ is correct, but something feels 
inconsistent here. It's really hard for me to name what is inside 
`materializedSstFiles`. Those are confirmed files and also those awaiting 
confirmation. But aborted files are being removed from it.
   
   What is even the purpose of the 
`RocksIncrementalSnapshotStrategy#notifyCheckpointAborted`? Isn't it a dead 
code? As far I can tell, it looks like only preventing memory leaks if 
checkpoints are getting continuously aborted. If we removed it, we wouldn't 
need `lastUploadedSstFiles` field. `lastUploadedCheckpointId` + 
`materializedSstFiles` would be enough (and less confusing).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to