rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1242179978


##########
flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java:
##########
@@ -67,14 +67,22 @@ public interface MaterializationTarget {
 
         /**
          * This method is not thread safe. It should be called either under a 
lock or through task
-         * mailbox executor.
+         * mailbox executor. Implementations should ensure that not to trigger 
materialization until
+         * the previous one not confirmed or failed.
          */
         void handleMaterializationResult(
                 SnapshotResult<KeyedStateHandle> materializedSnapshot,
                 long materializationID,
                 SequenceNumber upTo)
                 throws Exception;
 
+        /**
+         * This method is not thread safe. It should be called either under a 
lock or through task
+         * mailbox executor.
+         */

Review Comment:
   1. This is true for all the methods of this class, and is usually the case 
with most Flink code 
   2. Otherwise, use `@NotThreadSafe` on class?



##########
flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java:
##########
@@ -67,14 +67,22 @@ public interface MaterializationTarget {
 
         /**
          * This method is not thread safe. It should be called either under a 
lock or through task
-         * mailbox executor.
+         * mailbox executor. Implementations should ensure that not to trigger 
materialization until
+         * the previous one not confirmed or failed.

Review Comment:
   ```suggestion
            * mailbox executor. Implementations should not trigger 
materialization until
            * the previous one has been confirmed or failed.
   ```



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore(
                 materializationId = Math.max(materializationId, 
h.getMaterializationID());
             }
         }
+        this.lastFailedMaterializationId = materializationId;

Review Comment:
   Why do we set `lastFailedMaterializationId` to "restore" `materializationId`?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -390,23 +393,36 @@ public void release() {
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyMap());
+            new PreviousSnapshot(Collections.emptyList());
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
-
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
-            this.confirmedSstFiles = confirmedSstFiles;
+        @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;
+
+        protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> 
confirmedSstFiles) {
+            this.confirmedSstFiles =
+                    confirmedSstFiles != null
+                            ? confirmedSstFiles.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
HandleAndLocalPath::getLocalPath,
+                                                    
HandleAndLocalPath::getHandle))
+                            : Collections.emptyMap();
         }
 
-        protected Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
-            if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced 
with the
-                // original from the shared state registry (created from a 
previous checkpoint)
-                return Optional.of(
-                        new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+        protected Optional<StreamStateHandle> getUploaded(String filename) {
+            if (confirmedSstFiles.containsKey(filename)) {
+                StreamStateHandle handle = confirmedSstFiles.get(filename);
+                if (handle instanceof ByteStreamStateHandle) {

Review Comment:
   I think this check doesn't add any value and adds complexity by adding two 
more lines, and more importantly by making developers to think about special 
cases. 



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore(
      */
     @Override
     public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
+        if (materializedId > 0
+                && lastConfirmedMaterializationId < materializedId - 1
+                && lastFailedMaterializationId < materializedId - 1) {
+            LOG.info(
+                    "materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+                    materializedId - 1);
+            return Optional.empty();
+        }

Review Comment:
   1. Could you please describe in a comment the motivation to skip the 
materialization in this case?
   2. Aslo, ideally, this should be covered by some unit tests.
   3. Isn't `materializedId > 0` too weak when the job was recovered from some 
previous state (and `materializedId` was set in `completeRestore`)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java:
##########
@@ -32,9 +36,62 @@ public interface IncrementalKeyedStateHandle
     UUID getBackendIdentifier();
 
     /**
-     * Returns a set of ids of all registered shared states in the backend at 
the time this was
-     * created.
+     * Returns a list of all shared states and the corresponding localPath in 
the backend at the
+     * time this was created.
      */
     @Nonnull
-    Map<StateHandleID, StreamStateHandle> getSharedStateHandles();
+    List<HandleAndLocalPath> getSharedStateHandles();
+
+    /** A Holder of StreamStateHandle and the corresponding localPath. */
+    final class HandleAndLocalPath implements Serializable {
+
+        private static final long serialVersionUID = 7711754687567545052L;
+
+        StreamStateHandle handle;
+        final String localPath;

Review Comment:
   I think the marking this field `final` belongs to the previous commit.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##########
@@ -334,53 +325,61 @@ public SnapshotResult<KeyedStateHandle> 
get(CloseableRegistry snapshotCloseableR
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
-                @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> sstFiles,
+                @Nonnull List<HandleAndLocalPath> miscFiles,
                 @Nonnull CloseableRegistry snapshotCloseableRegistry,
                 @Nonnull CloseableRegistry tmpResourcesRegistry)
                 throws Exception {
 
             // write state data
             Preconditions.checkState(localBackupDirectory.exists());
 
-            Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
-            Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
-
             Path[] files = localBackupDirectory.listDirectory();
+            long uploadedSize = 0;
             if (files != null) {
+                List<Path> sstFilePaths = new ArrayList<>(files.length);
+                List<Path> miscFilePaths = new ArrayList<>(files.length);
+
                 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
                 final CheckpointedStateScope stateScope =
                         sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
                                 ? CheckpointedStateScope.EXCLUSIVE
                                 : CheckpointedStateScope.SHARED;
-                sstFiles.putAll(
+
+                List<HandleAndLocalPath> uploadedSstFiles =
                         stateUploader.uploadFilesToCheckpointFs(
                                 sstFilePaths,
                                 checkpointStreamFactory,
                                 stateScope,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
-                miscFiles.putAll(
+                                tmpResourcesRegistry);
+                uploadedSize +=
+                        uploadedSstFiles.stream()
+                                .mapToLong(e -> e.getHandle().getStateSize())

Review Comment:
   NIT: add method `getStateSize` to `HandleAndLocalPath`?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -390,23 +393,36 @@ public void release() {
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyMap());
+            new PreviousSnapshot(Collections.emptyList());
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
-
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
-            this.confirmedSstFiles = confirmedSstFiles;
+        @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;

Review Comment:
   NIT: `confirmedSstFiles` could be of type `Map<String, HandleAndLocalPath>`, 
the code would be a bit simpler then



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -80,9 +82,9 @@ public Map<StateHandleID, StreamStateHandle> 
uploadFilesToCheckpointFs(
         try {
             FutureUtils.waitForAll(futures.values()).get();
 
-            for (Map.Entry<StateHandleID, 
CompletableFuture<StreamStateHandle>> entry :
+            for (Map.Entry<String, CompletableFuture<StreamStateHandle>> entry 
:
                     futures.entrySet()) {
-                handles.put(entry.getKey(), entry.getValue().get());
+                handles.add(HandleAndLocalPath.of(entry.getValue().get(), 
entry.getKey()));

Review Comment:
   The code in `uploadFilesToCheckpointFs` can be simplified if 
`HandleAndLocalPath` is created inside `createUploadFutures`.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##########
@@ -219,34 +219,40 @@ public SnapshotResult<KeyedStateHandle> 
get(CloseableRegistry snapshotCloseableR
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> privateFiles,
                 @Nonnull CloseableRegistry snapshotCloseableRegistry,
                 @Nonnull CloseableRegistry tmpResourcesRegistry)
                 throws Exception {
 
             // write state data
             Preconditions.checkState(localBackupDirectory.exists());
 
-            Map<StateHandleID, Path> privateFilePaths = new HashMap<>();
-
             Path[] files = localBackupDirectory.listDirectory();
+            long uploadedSize = 0;
             if (files != null) {
+                List<Path> privateFilePaths = new ArrayList<>(files.length);
+
                 // all sst files are private in full snapshot
                 for (Path filePath : files) {
-                    final String fileName = filePath.getFileName().toString();
-                    final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-                    privateFilePaths.put(stateHandleID, filePath);
+                    privateFilePaths.add(filePath);

Review Comment:
   Is this intermediate variable still necessary? Can't we just use 
`Arrays.asList(files)`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
                 LOG.trace(
                         "Duplicated registration under key {} with a 
placeholder (normal case)",
                         registrationKey);
-                scheduledStateDeletion = newHandle;
-            } else if (entry.confirmed) {
-                LOG.info(
-                        "Duplicated registration under key {} of a new state: 
{}. "
-                                + "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-                                + "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-                        registrationKey,
-                        newHandle);
-                scheduledStateDeletion = newHandle;
             } else {
-                // Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-                // This might result from (omitted KG range here for 
simplicity):
-                // 1. Flink recovers from a failure using a checkpoint 1
-                // 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-                // 3. JM triggers checkpoint 2
-                // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-                // 5. TM crashes; everything is repeated from (2)
-                // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-                // 7. JM triggers checkpoint 3
-                // 8. TM sends NEW state "xyz-002.sst"
-                // 9. JM discards it as duplicate
-                // 10. checkpoint completes, but a wrong SST file is used
-                // So we use a new entry and discard the old one:
+                // might be a bug expect the StreamStateHandleWrapper used by
+                // ChangelogStateBackendHandleImpl
                 LOG.info(
-                        "Duplicated registration under key {} of a new state: 
{}. "
-                                + "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-                                + "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+                        "the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
                         registrationKey,
                         newHandle);
-                scheduledStateDeletion = entry.stateHandle;
-                entry.stateHandle = newHandle;
+                if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   I think there can be two cases related to `StreamStateHandleWrapper`:
   1. recovered from non-changelog-checkpoint + new changelog-checkpoint 
   2. recovered from changelog-checkpoint + new changelog-checkpoint (s)
   
   The 1st one will use `EmptyDiscardStateObjectForRegister` for the initial 
state (`entry.stateHandle`) and is covered by your branch.
   The 2nd will use `StreamStateHandleWrapper` - for both initial and the new 
checkpoint. According to `StreamStateHandleWrapper.equals` the handles must be 
equal. So this branch (not-equals) should not be reached.
   
   So it looks like it would be a bug and therefore we should throw an 
exception.
   Or am I missing something?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##########
@@ -78,11 +76,11 @@ public class RocksIncrementalSnapshotStrategy<K>
     private static final String DESCRIPTION = "Asynchronous incremental 
RocksDB snapshot";
 
     /**
-     * Stores the {@link StateHandleID IDs} of uploaded SST files that build 
the incremental
-     * history. Once the checkpoint is confirmed by JM, only the ID paired 
with {@link
-     * PlaceholderStreamStateHandle} can be sent.
+     * Stores the {@link StreamStateHandle} and corresponding local path of 
uploaded SST files that
+     * build the incremental history. Once the checkpoint is confirmed by JM, 
they can be reused for
+     * incremental checkpoint.
      */
-    @Nonnull private final SortedMap<Long, Map<StateHandleID, Long>> 
uploadedStateIDs;
+    @Nonnull private final SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedSstFilesMap;

Review Comment:
   NIT: `uploadedSstFilesMap` -> `uploadedSstFiles` ?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##########
@@ -334,53 +325,61 @@ public SnapshotResult<KeyedStateHandle> 
get(CloseableRegistry snapshotCloseableR
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
-                @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> sstFiles,
+                @Nonnull List<HandleAndLocalPath> miscFiles,
                 @Nonnull CloseableRegistry snapshotCloseableRegistry,
                 @Nonnull CloseableRegistry tmpResourcesRegistry)
                 throws Exception {
 
             // write state data
             Preconditions.checkState(localBackupDirectory.exists());
 
-            Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
-            Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
-
             Path[] files = localBackupDirectory.listDirectory();
+            long uploadedSize = 0;
             if (files != null) {
+                List<Path> sstFilePaths = new ArrayList<>(files.length);
+                List<Path> miscFilePaths = new ArrayList<>(files.length);
+
                 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
                 final CheckpointedStateScope stateScope =
                         sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
                                 ? CheckpointedStateScope.EXCLUSIVE
                                 : CheckpointedStateScope.SHARED;
-                sstFiles.putAll(
+
+                List<HandleAndLocalPath> uploadedSstFiles =
                         stateUploader.uploadFilesToCheckpointFs(
                                 sstFilePaths,
                                 checkpointStreamFactory,
                                 stateScope,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
-                miscFiles.putAll(
+                                tmpResourcesRegistry);
+                uploadedSize +=
+                        uploadedSstFiles.stream()
+                                .mapToLong(e -> e.getHandle().getStateSize())
+                                .sum();
+                sstFiles.addAll(uploadedSstFiles);
+
+                List<HandleAndLocalPath> uploadedMiscFiles =
                         stateUploader.uploadFilesToCheckpointFs(
                                 miscFilePaths,
                                 checkpointStreamFactory,
                                 stateScope,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
-
-                synchronized (uploadedStateIDs) {
+                                tmpResourcesRegistry);
+                uploadedSize +=
+                        uploadedMiscFiles.stream()
+                                .mapToLong(e -> e.getHandle().getStateSize())
+                                .sum();
+                miscFiles.addAll(uploadedMiscFiles);
+
+                synchronized (uploadedSstFilesMap) {
                     switch (sharingFilesStrategy) {
                         case FORWARD_BACKWARD:
                         case FORWARD:
-                            uploadedStateIDs.put(
-                                    checkpointId,
-                                    sstFiles.entrySet().stream()
-                                            .collect(
-                                                    Collectors.toMap(
-                                                            Map.Entry::getKey,
-                                                            t -> 
t.getValue().getStateSize())));
+                            uploadedSstFilesMap.put(checkpointId, sstFiles);

Review Comment:
   Should `sstFiles` be guarded against modifications?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##########
@@ -219,34 +219,40 @@ public SnapshotResult<KeyedStateHandle> 
get(CloseableRegistry snapshotCloseableR
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> privateFiles,

Review Comment:
   NIT: This method can be simplified by returned the list of files and 
computing the size in the caller.
   Not sure if this is in scope of this PR or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to