This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new f396e6da3d2 [FLINK-37367][state/forst] Make ForSt inherit uploaded SST
files after restorations. (#26259)
f396e6da3d2 is described below
commit f396e6da3d23e31d987ae7fdb0f4acbc1661a1b9
Author: AlexYinHan <[email protected]>
AuthorDate: Thu Mar 6 19:46:07 2025 +0800
[FLINK-37367][state/forst] Make ForSt inherit uploaded SST files after
restorations. (#26259)
---
.../apache/flink/state/forst/ForStKeyedStateBackendBuilder.java | 8 +++++++-
.../flink/state/forst/datatransfer/CopyDataTransferStrategy.java | 5 -----
.../flink/state/forst/fs/filemapping/FileMappingManager.java | 4 ++++
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 67aa91371c2..25649fcef7f 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -263,10 +263,16 @@ public class ForStKeyedStateBackendBuilder<K>
defaultColumnFamilyHandle =
restoreResult.getDefaultColumnFamilyHandle();
nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
- // TODO: init materializedSstFiles and lastCompletedCheckpointId
when implement restore
SortedMap<Long,
Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>
materializedSstFiles = new TreeMap<>();
long lastCompletedCheckpointId = -1L;
+ if (restoreOperation instanceof ForStIncrementalRestoreOperation) {
+ backendUID = restoreResult.getBackendUID();
+ lastCompletedCheckpointId =
restoreResult.getLastCompletedCheckpointId();
+ if (recoveryClaimMode != RecoveryClaimMode.NO_CLAIM) {
+ materializedSstFiles = restoreResult.getRestoredSstFiles();
+ }
+ }
snapshotStrategy =
initializeSnapshotStrategy(
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
index 860cbbc0234..6686ba41878 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
-import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -115,10 +114,6 @@ public class CopyDataTransferStrategy extends
DataTransferStrategy {
((ForStFlinkFileSystem)
dbFileSystem).getMappingEntry(dbFilePath);
Preconditions.checkNotNull(mappingEntry, "dbFile not found: " +
dbFilePath);
sourceStateHandle = mappingEntry.getSource().toStateHandle();
- if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
- // The file is already owned by JM, simply return the state
handle
- return HandleAndLocalPath.of(sourceStateHandle,
dbFilePath.getName());
- }
} else {
// Construct a FileStateHandle base on the DB file
FileSystem sourceFileSystem = dbFilePath.getFileSystem();
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index 7d5544e8f8d..1e8cea5e58f 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -165,6 +165,10 @@ public class FileMappingManager {
* @return always return true except for IOException
*/
public boolean renameFile(String src, String dst) throws IOException {
+ if (src.equals(dst)) {
+ return true;
+ }
+
MappingEntry srcEntry = mappingTable.get(src);
if (srcEntry != null) { // rename file
if (mappingTable.containsKey(dst)) {