rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1242179978
########## flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java: ########## @@ -67,14 +67,22 @@ public interface MaterializationTarget { /** * This method is not thread safe. It should be called either under a lock or through task - * mailbox executor. + * mailbox executor. Implementations should ensure that not to trigger materialization until + * the previous one not confirmed or failed. */ void handleMaterializationResult( SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo) throws Exception; + /** + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + */ Review Comment: 1. This is true for all the methods of this class, and is usually the case with most Flink code 2. Otherwise, use `@NotThreadSafe` on class? ########## flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java: ########## @@ -67,14 +67,22 @@ public interface MaterializationTarget { /** * This method is not thread safe. It should be called either under a lock or through task - * mailbox executor. + * mailbox executor. Implementations should ensure that not to trigger materialization until + * the previous one not confirmed or failed. Review Comment: ```suggestion * mailbox executor. Implementations should not trigger materialization until * the previous one has been confirmed or failed. ``` ########## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ########## @@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore( materializationId = Math.max(materializationId, h.getMaterializationID()); } } + this.lastFailedMaterializationId = materializationId; Review Comment: Why do we set `lastFailedMaterializationId` to "restore" `materializationId`? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ########## @@ -390,23 +393,36 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyMap()); + new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; - - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { - this.confirmedSstFiles = confirmedSstFiles; + @Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles; + + protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) { + this.confirmedSstFiles = + confirmedSstFiles != null + ? confirmedSstFiles.stream() + .collect( + Collectors.toMap( + HandleAndLocalPath::getLocalPath, + HandleAndLocalPath::getHandle)) + : Collections.emptyMap(); } - protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) { - if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - return Optional.of( - new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); + protected Optional<StreamStateHandle> getUploaded(String filename) { + if (confirmedSstFiles.containsKey(filename)) { + StreamStateHandle handle = confirmedSstFiles.get(filename); + if (handle instanceof ByteStreamStateHandle) { Review Comment: I think this check doesn't add any value and adds complexity by adding two more lines, and more importantly by making developers to think about special cases. ########## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ########## @@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore( */ @Override public Optional<MaterializationRunnable> initMaterialization() throws Exception { + if (materializedId > 0 + && lastConfirmedMaterializationId < materializedId - 1 + && lastFailedMaterializationId < materializedId - 1) { + LOG.info( + "materialization:{} not confirmed or failed or cancelled, skip trigger new one.", + materializedId - 1); + return Optional.empty(); + } Review Comment: 1. Could you please describe in a comment the motivation to skip the materialization in this case? 2. Aslo, ideally, this should be covered by some unit tests. 3. Isn't `materializedId > 0` too weak when the job was recovered from some previous state (and `materializedId` was set in `completeRestore`)? ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java: ########## @@ -32,9 +36,62 @@ public interface IncrementalKeyedStateHandle UUID getBackendIdentifier(); /** - * Returns a set of ids of all registered shared states in the backend at the time this was - * created. + * Returns a list of all shared states and the corresponding localPath in the backend at the + * time this was created. */ @Nonnull - Map<StateHandleID, StreamStateHandle> getSharedStateHandles(); + List<HandleAndLocalPath> getSharedStateHandles(); + + /** A Holder of StreamStateHandle and the corresponding localPath. */ + final class HandleAndLocalPath implements Serializable { + + private static final long serialVersionUID = 7711754687567545052L; + + StreamStateHandle handle; + final String localPath; Review Comment: I think the marking this field `final` belongs to the previous commit. ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ########## @@ -334,53 +325,61 @@ public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableR } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, - @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> sstFiles, + @Nonnull List<HandleAndLocalPath> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); - Map<StateHandleID, Path> sstFilePaths = new HashMap<>(); - Map<StateHandleID, Path> miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); + long uploadedSize = 0; if (files != null) { + List<Path> sstFilePaths = new ArrayList<>(files.length); + List<Path> miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; - sstFiles.putAll( + + List<HandleAndLocalPath> uploadedSstFiles = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, - tmpResourcesRegistry)); - miscFiles.putAll( + tmpResourcesRegistry); + uploadedSize += + uploadedSstFiles.stream() + .mapToLong(e -> e.getHandle().getStateSize()) Review Comment: NIT: add method `getStateSize` to `HandleAndLocalPath`? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ########## @@ -390,23 +393,36 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyMap()); + new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; - - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { - this.confirmedSstFiles = confirmedSstFiles; + @Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles; Review Comment: NIT: `confirmedSstFiles` could be of type `Map<String, HandleAndLocalPath>`, the code would be a bit simpler then ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java: ########## @@ -80,9 +82,9 @@ public Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs( try { FutureUtils.waitForAll(futures.values()).get(); - for (Map.Entry<StateHandleID, CompletableFuture<StreamStateHandle>> entry : + for (Map.Entry<String, CompletableFuture<StreamStateHandle>> entry : futures.entrySet()) { - handles.put(entry.getKey(), entry.getValue().get()); + handles.add(HandleAndLocalPath.of(entry.getValue().get(), entry.getKey())); Review Comment: The code in `uploadFilesToCheckpointFs` can be simplified if `HandleAndLocalPath` is created inside `createUploadFutures`. ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java: ########## @@ -219,34 +219,40 @@ public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableR } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> privateFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); - Map<StateHandleID, Path> privateFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); + long uploadedSize = 0; if (files != null) { + List<Path> privateFilePaths = new ArrayList<>(files.length); + // all sst files are private in full snapshot for (Path filePath : files) { - final String fileName = filePath.getFileName().toString(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - privateFilePaths.put(stateHandleID, filePath); + privateFilePaths.add(filePath); Review Comment: Is this intermediate variable still necessary? Can't we just use `Arrays.asList(files)`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); - scheduledStateDeletion = newHandle; - } else if (entry.confirmed) { - LOG.info( - "Duplicated registration under key {} of a new state: {}. " - + "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " - + "Discarding the new state and keeping the old one which is included into a completed checkpoint", - registrationKey, - newHandle); - scheduledStateDeletion = newHandle; } else { - // Old entry is not in a confirmed checkpoint yet, and the new one differs. - // This might result from (omitted KG range here for simplicity): - // 1. Flink recovers from a failure using a checkpoint 1 - // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } - // 3. JM triggers checkpoint 2 - // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" - // 5. TM crashes; everything is repeated from (2) - // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } - // 7. JM triggers checkpoint 3 - // 8. TM sends NEW state "xyz-002.sst" - // 9. JM discards it as duplicate - // 10. checkpoint completes, but a wrong SST file is used - // So we use a new entry and discard the old one: + // might be a bug expect the StreamStateHandleWrapper used by + // ChangelogStateBackendHandleImpl LOG.info( - "Duplicated registration under key {} of a new state: {}. " - + "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " - + "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", + "the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); - scheduledStateDeletion = entry.stateHandle; - entry.stateHandle = newHandle; + if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: I think there can be two cases related to `StreamStateHandleWrapper`: 1. recovered from non-changelog-checkpoint + new changelog-checkpoint 2. recovered from changelog-checkpoint + new changelog-checkpoint (s) The 1st one will use `EmptyDiscardStateObjectForRegister` for the initial state (`entry.stateHandle`) and is covered by your branch. The 2nd will use `StreamStateHandleWrapper` - for both initial and the new checkpoint. According to `StreamStateHandleWrapper.equals` the handles must be equal. So this branch (not-equals) should not be reached. So it looks like it would be a bug and therefore we should throw an exception. Or am I missing something? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ########## @@ -78,11 +76,11 @@ public class RocksIncrementalSnapshotStrategy<K> private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot"; /** - * Stores the {@link StateHandleID IDs} of uploaded SST files that build the incremental - * history. Once the checkpoint is confirmed by JM, only the ID paired with {@link - * PlaceholderStreamStateHandle} can be sent. + * Stores the {@link StreamStateHandle} and corresponding local path of uploaded SST files that + * build the incremental history. Once the checkpoint is confirmed by JM, they can be reused for + * incremental checkpoint. */ - @Nonnull private final SortedMap<Long, Map<StateHandleID, Long>> uploadedStateIDs; + @Nonnull private final SortedMap<Long, Collection<HandleAndLocalPath>> uploadedSstFilesMap; Review Comment: NIT: `uploadedSstFilesMap` -> `uploadedSstFiles` ? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ########## @@ -334,53 +325,61 @@ public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableR } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, - @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> sstFiles, + @Nonnull List<HandleAndLocalPath> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); - Map<StateHandleID, Path> sstFilePaths = new HashMap<>(); - Map<StateHandleID, Path> miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); + long uploadedSize = 0; if (files != null) { + List<Path> sstFilePaths = new ArrayList<>(files.length); + List<Path> miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; - sstFiles.putAll( + + List<HandleAndLocalPath> uploadedSstFiles = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, - tmpResourcesRegistry)); - miscFiles.putAll( + tmpResourcesRegistry); + uploadedSize += + uploadedSstFiles.stream() + .mapToLong(e -> e.getHandle().getStateSize()) + .sum(); + sstFiles.addAll(uploadedSstFiles); + + List<HandleAndLocalPath> uploadedMiscFiles = stateUploader.uploadFilesToCheckpointFs( miscFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, - tmpResourcesRegistry)); - - synchronized (uploadedStateIDs) { + tmpResourcesRegistry); + uploadedSize += + uploadedMiscFiles.stream() + .mapToLong(e -> e.getHandle().getStateSize()) + .sum(); + miscFiles.addAll(uploadedMiscFiles); + + synchronized (uploadedSstFilesMap) { switch (sharingFilesStrategy) { case FORWARD_BACKWARD: case FORWARD: - uploadedStateIDs.put( - checkpointId, - sstFiles.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - t -> t.getValue().getStateSize()))); + uploadedSstFilesMap.put(checkpointId, sstFiles); Review Comment: Should `sstFiles` be guarded against modifications? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java: ########## @@ -219,34 +219,40 @@ public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableR } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> privateFiles, Review Comment: NIT: This method can be simplified by returned the list of files and computing the size in the caller. Not sure if this is in scope of this PR or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org