This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 4f71f44145b [FLINK-37597][state/forst] Use ResourceGuard to make sure
snapshot quit before ForSt disposal
4f71f44145b is described below
commit 4f71f44145b0ab7ab3cc23783cd4d5108efafb2e
Author: fredia <[email protected]>
AuthorDate: Tue Apr 1 14:42:23 2025 +0800
[FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit
before ForSt disposal
---
.../flink/state/forst/ForStKeyedStateBackend.java | 17 +++++
.../state/forst/ForStKeyedStateBackendBuilder.java | 1 +
.../snapshot/ForStNativeFullSnapshotStrategy.java | 7 +-
.../forst/snapshot/ForStSnapshotStrategyBase.java | 87 ----------------------
4 files changed, 23 insertions(+), 89 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 5cc7e2f3d2b..4b40f89fb2a 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -60,6 +60,7 @@ import
org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
@@ -120,6 +121,12 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
/** The container of ForSt options. */
private final ForStResourceContainer optionsContainer;
+ /**
+ * Protects access to ForSt in other threads, like the checkpointing
thread from parallel call
+ * that disposes the ForSt object.
+ */
+ private final ResourceGuard resourceGuard;
+
/** Factory function to create column family options from state name. */
private final Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory;
@@ -183,6 +190,7 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
UUID backendUID,
ExecutionConfig executionConfig,
ForStResourceContainer optionsContainer,
+ ResourceGuard resourceGuard,
int keyGroupPrefixBytes,
TypeSerializer<K> keySerializer,
Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
@@ -203,6 +211,7 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
this.backendUID = backendUID;
this.executionConfig = executionConfig;
this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
+ this.resourceGuard = resourceGuard;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.keyGroupRange = keyContext.getKeyGroupRange();
this.keySerializer = keySerializer;
@@ -510,6 +519,14 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
if (this.disposed) {
return;
}
+ // This call will block until all clients that still acquire
access to the ForSt
+ // instance
+ // have released it,
+ // so that we cannot release the native resources while clients
are still working with
+ // it in
+ // parallel.
+ resourceGuard.close();
+
for (StateExecutor executor : managedStateExecutors) {
executor.shutdown();
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 74a128c4dfd..30b37ed82b2 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -329,6 +329,7 @@ public class ForStKeyedStateBackendBuilder<K>
backendUID,
executionConfig,
this.optionsContainer,
+ forstResourceGuard,
keyGroupPrefixBytes,
this.keySerializerProvider.currentSchemaSerializer(),
serializedKeyBuilder,
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
index 37674de9b86..e4ba427f389 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
@@ -152,11 +152,12 @@ public class ForStNativeFullSnapshotStrategy<K>
final PreviousSnapshot previousSnapshot =
snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
- // Disable file deletion for file transformation. ForSt will decide
whether to allow file
+ ResourceGuard.Lease lease = resourceGuard.acquireResource();
+ // Disable file deletion for file transformation. ForSt will decide
whether to allow
+ // file
// deletion based on the number of calls to disableFileDeletions() and
// enableFileDeletions(), so disableFileDeletions() should be call
only once.
db.disableFileDeletions();
-
try {
// get live files with flush memtable
RocksDB.LiveFiles liveFiles = db.getLiveFiles(true);
@@ -185,6 +186,7 @@ public class ForStNativeFullSnapshotStrategy<K>
() -> {
try {
db.enableFileDeletions(false);
+ lease.close();
LOG.info(
"Release one file deletion lock with
ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.",
backendUID,
@@ -203,6 +205,7 @@ public class ForStNativeFullSnapshotStrategy<K>
backendUID,
checkpointId);
db.enableFileDeletions(false);
+ lease.close();
throw e;
}
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
index 4a9ddcdd49e..7350e8aaf6c 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
@@ -28,15 +28,11 @@ import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
@@ -50,8 +46,6 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
@@ -76,8 +70,6 @@ import java.util.stream.Collectors;
public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>,
AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
-
@Nonnull private final String description;
/** ForSt instance from the backend. */
@@ -194,28 +186,6 @@ public abstract class ForStSnapshotStrategyBase<K, R
extends SnapshotResources>
@Override
public abstract void close();
- protected void cleanupIncompleteSnapshot(
- @Nonnull CloseableRegistry tmpResourcesRegistry,
- @Nonnull SnapshotDirectory localBackupDirectory) {
- try {
- tmpResourcesRegistry.close();
- } catch (Exception e) {
- LOG.warn("Could not properly clean tmp resources.", e);
- }
-
- if (localBackupDirectory.isSnapshotCompleted()) {
- try {
- DirectoryStateHandle directoryStateHandle =
- localBackupDirectory.completeSnapshotAndGetHandle();
- if (directoryStateHandle != null) {
- directoryStateHandle.discardState();
- }
- } catch (Exception e) {
- LOG.warn("Could not properly discard local state.", e);
- }
- }
- }
-
/** Common operation in native ForSt snapshot result supplier. */
protected abstract class ForStSnapshotOperation
implements SnapshotResultSupplier<KeyedStateHandle> {
@@ -230,27 +200,6 @@ public abstract class ForStSnapshotStrategyBase<K, R
extends SnapshotResources>
this.checkpointStreamFactory = checkpointStreamFactory;
this.tmpResourcesRegistry = new CloseableRegistry();
}
-
- protected Optional<KeyedStateHandle> getLocalSnapshot(
- SnapshotDirectory localBackupDirectory,
- @Nullable StreamStateHandle localStreamStateHandle,
- List<IncrementalKeyedStateHandle.HandleAndLocalPath>
sharedState)
- throws IOException {
- final DirectoryStateHandle directoryStateHandle =
- localBackupDirectory.completeSnapshotAndGetHandle();
- if (directoryStateHandle != null && localStreamStateHandle !=
null) {
- return Optional.of(
- new IncrementalLocalKeyedStateHandle(
- backendUID,
- checkpointId,
- directoryStateHandle,
- keyGroupRange,
- localStreamStateHandle,
- sharedState));
- } else {
- return Optional.empty();
- }
- }
}
/** A {@link SnapshotResources} for native ForSt snapshot. */
@@ -301,42 +250,6 @@ public abstract class ForStSnapshotStrategyBase<K, R
extends SnapshotResources>
}
}
- /** A {@link SnapshotResources} for forst sync snapshot. */
- protected static class ForStSyncSnapshotResources implements
SnapshotResources {
- @Nonnull protected final SnapshotDirectory snapshotDirectory;
-
- @Nonnull protected final PreviousSnapshot previousSnapshot;
-
- @Nonnull protected final List<StateMetaInfoSnapshot>
stateMetaInfoSnapshots;
-
- public ForStSyncSnapshotResources(
- SnapshotDirectory snapshotDirectory,
- PreviousSnapshot previousSnapshot,
- List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
- this.snapshotDirectory = snapshotDirectory;
- this.previousSnapshot = previousSnapshot;
- this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
- }
-
- @Override
- public void release() {
- try {
- if (snapshotDirectory.exists()) {
- LOG.trace(
- "Running cleanup for local RocksDB backup
directory {}.",
- snapshotDirectory);
- boolean cleanupOk = snapshotDirectory.cleanup();
-
- if (!cleanupOk) {
- LOG.debug("Could not properly cleanup local RocksDB
backup directory.");
- }
- }
- } catch (IOException e) {
- LOG.warn("Could not properly cleanup local RocksDB backup
directory.", e);
- }
- }
- }
-
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
new PreviousSnapshot(Collections.emptyList());