This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3bc96001656 [FLINK-37620][state/forst] ForSt Sync mode support remote
storage (#26412)
3bc96001656 is described below
commit 3bc96001656cda6ccaeda22741c75e035505ec1b
Author: Yanfei Lei <[email protected]>
AuthorDate: Wed Apr 9 10:32:51 2025 +0800
[FLINK-37620][state/forst] ForSt Sync mode support remote storage (#26412)
---
.../docs/ops/state/disaggregated_state.md | 14 +++
docs/content/docs/ops/state/disaggregated_state.md | 14 +++
.../shortcodes/generated/forst_configuration.html | 6 ++
.../generated/state_backend_forst_section.html | 6 ++
.../state/forst/ForStKeyedStateBackendBuilder.java | 1 -
.../org/apache/flink/state/forst/ForStOptions.java | 11 ++
.../flink/state/forst/ForStStateBackend.java | 85 +++++++--------
.../state/forst/sync/ForStPriorityQueueConfig.java | 18 ++--
.../forst/sync/ForStSyncKeyedStateBackend.java | 115 +++++++++------------
.../sync/ForStSyncKeyedStateBackendBuilder.java | 72 +++++--------
.../flink/state/forst/ForStStateBackendTest.java | 41 ++++++++
11 files changed, 219 insertions(+), 164 deletions(-)
diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md
b/docs/content.zh/docs/ops/state/disaggregated_state.md
index e59a765766b..d18458cef71 100644
--- a/docs/content.zh/docs/ops/state/disaggregated_state.md
+++ b/docs/content.zh/docs/ops/state/disaggregated_state.md
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir:
s3://your-bucket/forst-state
checkpoint and fast recovery, since the ForSt will perform file copy between
the primary
storage location and the checkpoint directory during checkpointing and
recovery.
+#### ForSt Local Storage Location
+
+By default, ForSt will **ONLY** disaggregate state when asynchronous APIs
(State V2) are used. When
+using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve
as **local state store**.
+Since a job may contain multiple ForSt instances with mixed API usage,
synchronous local state access
+along with asynchronous remote state access could help achieve better overall
throughput.
+If you want the operators with synchronous state APIs to store state in
remote, the following configuration will help:
+```yaml
+state.backend.forst.sync.enforce-local: false
+```
+And you can specify the local storage location via:
+```yaml
+state.backend.forst.local-dir: path-to-local-dir
+```
#### ForSt File Cache
diff --git a/docs/content/docs/ops/state/disaggregated_state.md
b/docs/content/docs/ops/state/disaggregated_state.md
index e59a765766b..5d2a757f186 100644
--- a/docs/content/docs/ops/state/disaggregated_state.md
+++ b/docs/content/docs/ops/state/disaggregated_state.md
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir:
s3://your-bucket/forst-state
checkpoint and fast recovery, since the ForSt will perform file copy between
the primary
storage location and the checkpoint directory during checkpointing and
recovery.
+#### ForSt Local Storage Location
+
+By default, ForSt will **ONLY** disaggregate state when asynchronous APIs
(State V2) are used. When
+using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve
as **local state store**.
+Since a job may contain multiple ForSt instances with mixed API usage,
synchronous local state access
+along with asynchronous remote state access could help achieve better overall
throughput.
+If you want the operators with synchronous state APIs to store state in
remote, the following configuration will help:
+```yaml
+state.backend.forst.sync.enforce-local: false
+```
+And you can specify the local storage location via:
+```yaml
+state.backend.forst.local-dir: path-to-local-dir
+```
#### ForSt File Cache
diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html
b/docs/layouts/shortcodes/generated/forst_configuration.html
index 5b5b50ac970..f66c1af4d8b 100644
--- a/docs/layouts/shortcodes/generated/forst_configuration.html
+++ b/docs/layouts/shortcodes/generated/forst_configuration.html
@@ -116,6 +116,12 @@
<td>String</td>
<td>The primary directory where ForSt puts its SST files. By
default, it will be the same as the checkpoint directory. Recognized shortcut
name is 'checkpoint-dir', which means that ForSt shares the directory with
checkpoint, and 'local-dir', which means that ForSt will use the local
directory of TaskManager.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.forst.sync.enforce-local</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to enforce local state for operators in synchronous
mode when enabling disaggregated state. This is useful in cases where both
synchronous operators and asynchronous operators are used in the same job.</td>
+ </tr>
<tr>
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
diff --git a/docs/layouts/shortcodes/generated/state_backend_forst_section.html
b/docs/layouts/shortcodes/generated/state_backend_forst_section.html
index f4782714f39..6a7ebbfbd93 100644
--- a/docs/layouts/shortcodes/generated/state_backend_forst_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_forst_section.html
@@ -50,6 +50,12 @@
<td>String</td>
<td>The primary directory where ForSt puts its SST files. By
default, it will be the same as the checkpoint directory. Recognized shortcut
name is 'checkpoint-dir', which means that ForSt shares the directory with
checkpoint, and 'local-dir', which means that ForSt will use the local
directory of TaskManager.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.forst.sync.enforce-local</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to enforce local state for operators in synchronous
mode when enabling disaggregated state. This is useful in cases where both
synchronous operators and asynchronous operators are used in the same job.</td>
+ </tr>
<tr>
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 30b37ed82b2..bfb2291be32 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -359,7 +359,6 @@ public class ForStKeyedStateBackendBuilder<K>
// env. We expect to directly use the dfs directory in flink env or
local directory as
// working dir. We will implement this in ForStDB later, but before
that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
- // TODO: use localForStPath as dbPath after ForSt Support mixing
local-dir and remote-dir
Path instanceForStPath =
optionsContainer.getRemoteForStPath() == null
? optionsContainer.getLocalForStPath()
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
index 6d27d9b2a7b..8a816e3e1b1 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
@@ -66,6 +66,17 @@ public class ForStOptions {
CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT,
LOCAL_DIR_AS_PRIMARY_SHORTCUT));
+ @Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
+ public static final ConfigOption<Boolean> SYNC_ENFORCE_LOCAL =
+ ConfigOptions.key("state.backend.forst.sync.enforce-local")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to enforce local state for operators in
synchronous mode when"
+ + " enabling disaggregated state. This is
useful in cases where "
+ + "both synchronous operators and
asynchronous operators are used "
+ + "in the same job.");
+
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
public static final ConfigOption<String> CACHE_DIRECTORY =
ConfigOptions.key("state.backend.forst.cache.dir")
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 6dc4dba156d..57a8a729f45 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -188,8 +189,12 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
/** The recovery claim mode. */
private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;
+ /** Whether to share the ForSt remote directory with checkpoint directory.
*/
private boolean remoteShareWithCheckpoint;
+ /** Whether to use local directory as primary directory in synchronous
mode. */
+ private boolean forceSyncLocal;
+
// ------------------------------------------------------------------------
/** Creates a new {@code ForStStateBackend} for storing state. */
@@ -203,6 +208,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
this.remoteShareWithCheckpoint = false;
+ this.forceSyncLocal = true;
}
/**
@@ -237,6 +243,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
: new Path(remoteDirStr);
}
}
+ this.forceSyncLocal = config.get(ForStOptions.SYNC_ENFORCE_LOCAL);
this.priorityQueueConfig =
ForStPriorityQueueConfig.fromOtherAndConfiguration(
@@ -409,31 +416,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
lazyInitializeForJob(env, fileCompatibleIdentifier);
- String opChildPath =
- String.format(
- "op_%s_attempt_%s",
- fileCompatibleIdentifier,
env.getTaskInfo().getAttemptNumber());
-
- Path localBasePath =
- new Path(
- new File(new File(getNextStoragePath(),
jobId.toHexString()), opChildPath)
- .getAbsolutePath());
- Path remoteBasePath = null;
- if (remoteForStDirectory != null) {
- remoteBasePath =
- new Path(new Path(remoteForStDirectory,
jobId.toHexString()), opChildPath);
- } else if (remoteShareWithCheckpoint) {
- if (env.getCheckpointStorageAccess() instanceof
FsCheckpointStorageAccess) {
- Path sharedStateDirectory =
- ((FsCheckpointStorageAccess)
env.getCheckpointStorageAccess())
- .getSharedStateDirectory();
- remoteBasePath = new Path(sharedStateDirectory, opChildPath);
- LOG.info("Set remote ForSt directory to checkpoint directory
{}", remoteBasePath);
- } else {
- LOG.warn(
- "Remote ForSt directory can't be set, because
checkpoint directory isn't on file system.");
- }
- }
+ Tuple2<Path, Path> localAndRemoteBasePath =
getForStBasePath(fileCompatibleIdentifier, env);
final OpaqueMemoryResource<ForStSharedResources> sharedResources =
ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -448,8 +431,8 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
- localBasePath,
- remoteBasePath,
+ localAndRemoteBasePath.f0,
+ localAndRemoteBasePath.f1,
env.getCheckpointStorageAccess(),
parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());
@@ -505,17 +488,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
lazyInitializeForJob(env, fileCompatibleIdentifier);
- Path instanceBasePath =
- new Path(
- new File(
- getNextStoragePath(),
- "job_"
- + jobId
- + "_op_"
- + fileCompatibleIdentifier
- + "_uuid_"
- + UUID.randomUUID())
- .getAbsolutePath());
+ Tuple2<Path, Path> localAndRemoteBasePath =
getForStBasePath(fileCompatibleIdentifier, env);
LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();
@@ -533,10 +506,10 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
- instanceBasePath,
- null,
+ localAndRemoteBasePath.f0,
+ forceSyncLocal ? null : localAndRemoteBasePath.f1,
env.getCheckpointStorageAccess(),
- null,
+ parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());
ExecutionConfig executionConfig = env.getExecutionConfig();
@@ -549,7 +522,6 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
new ForStSyncKeyedStateBackendBuilder<>(
parameters.getOperatorIdentifier(),
env.getUserCodeClassLoader().asClassLoader(),
- instanceBasePath,
resourceContainer,
stateName ->
resourceContainer.getColumnOptions(),
parameters.getKvStateRegistry(),
@@ -818,6 +790,35 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
return configuration;
}
+ Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment
env) {
+ String opChildPath =
+ String.format(
+ "op_%s_attempt_%s",
+ operatorIdentifier,
env.getTaskInfo().getAttemptNumber());
+
+ Path localBasePath =
+ new Path(
+ new File(new File(getNextStoragePath(),
jobId.toHexString()), opChildPath)
+ .getAbsolutePath());
+ Path remoteBasePath = null;
+ if (remoteForStDirectory != null) {
+ remoteBasePath =
+ new Path(new Path(remoteForStDirectory,
jobId.toHexString()), opChildPath);
+ } else if (remoteShareWithCheckpoint) {
+ if (env.getCheckpointStorageAccess() instanceof
FsCheckpointStorageAccess) {
+ Path sharedStateDirectory =
+ ((FsCheckpointStorageAccess)
env.getCheckpointStorageAccess())
+ .getSharedStateDirectory();
+ remoteBasePath = new Path(sharedStateDirectory, opChildPath);
+ LOG.info("Set remote ForSt directory to checkpoint directory
{}", remoteBasePath);
+ } else {
+ LOG.warn(
+ "Remote ForSt directory can't be set, because
checkpoint directory isn't on file system.");
+ }
+ }
+ return Tuple2.of(localBasePath, remoteBasePath);
+ }
+
@VisibleForTesting
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path
localBasePath) {
return createOptionsAndResourceContainer(null, localBasePath, null,
null, null, false);
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
index a60907ed7af..afbd3bfe6a6 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
@@ -28,7 +28,7 @@ import static
org.apache.flink.state.forst.ForStOptions.FORST_TIMER_SERVICE_FACT
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** The configuration of rocksDB priority queue state implementation. */
+/** The configuration of ForSt priority queue state implementation. */
public class ForStPriorityQueueConfig implements Serializable {
private static final long serialVersionUID = 1L;
@@ -39,7 +39,7 @@ public class ForStPriorityQueueConfig implements Serializable
{
private @Nullable ForStStateBackend.PriorityQueueStateType
priorityQueueStateType;
/** cache size per keyGroup for rocksDB priority queue state. */
- private int rocksDBPriorityQueueSetCacheSize;
+ private int forStDBPriorityQueueSetCacheSize;
public ForStPriorityQueueConfig() {
this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE);
@@ -47,9 +47,9 @@ public class ForStPriorityQueueConfig implements Serializable
{
public ForStPriorityQueueConfig(
ForStStateBackend.PriorityQueueStateType priorityQueueStateType,
- int rocksDBPriorityQueueSetCacheSize) {
+ int forStDBPriorityQueueSetCacheSize) {
this.priorityQueueStateType = priorityQueueStateType;
- this.rocksDBPriorityQueueSetCacheSize =
rocksDBPriorityQueueSetCacheSize;
+ this.forStDBPriorityQueueSetCacheSize =
forStDBPriorityQueueSetCacheSize;
}
/**
@@ -70,10 +70,10 @@ public class ForStPriorityQueueConfig implements
Serializable {
* Gets the cache size of rocksDB priority queue set. It will fall back to
the default value if
* it is not explicitly set.
*/
- public int getRocksDBPriorityQueueSetCacheSize() {
- return rocksDBPriorityQueueSetCacheSize ==
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
+ public int getForStDBPriorityQueueSetCacheSize() {
+ return forStDBPriorityQueueSetCacheSize ==
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
? FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()
- : rocksDBPriorityQueueSetCacheSize;
+ : forStDBPriorityQueueSetCacheSize;
}
public static ForStPriorityQueueConfig fromOtherAndConfiguration(
@@ -83,10 +83,10 @@ public class ForStPriorityQueueConfig implements
Serializable {
? config.get(TIMER_SERVICE_FACTORY)
: other.priorityQueueStateType;
int cacheSize =
- (other.rocksDBPriorityQueueSetCacheSize
+ (other.forStDBPriorityQueueSetCacheSize
==
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE)
? config.get(FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE)
- : other.rocksDBPriorityQueueSetCacheSize;
+ : other.forStDBPriorityQueueSetCacheSize;
return new ForStPriorityQueueConfig(priorityQueueType, cacheSize);
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index da26ae0d937..783031c1386 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -30,7 +30,6 @@ import
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -64,7 +63,6 @@ import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
-import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -85,7 +83,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -105,7 +102,7 @@ import static
org.apache.flink.runtime.state.SnapshotExecutionType.ASYNCHRONOUS;
import static org.apache.flink.util.Preconditions.checkState;
/**
- * An {@link AbstractKeyedStateBackend} that stores its state in {@code
RocksDB} and serializes
+ * An {@link AbstractKeyedStateBackend} that stores its state in {@code
ForStDB} and serializes
* state to streams provided by a {@link
org.apache.flink.runtime.state.CheckpointStreamFactory}
* upon checkpointing. This state backend can store very large state that
exceeds memory and spills
* to disk. Except for the snapshotting, this class should be accessed as if
it is not threadsafe.
@@ -178,17 +175,14 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
/** Factory function to create column family options from state name. */
private final Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory;
- /** The container of RocksDB option factory and predefined options. */
+ /** The container of ForSt option factory and predefined options. */
private final ForStResourceContainer optionsContainer;
- /** Path where this configured instance stores its data directory. */
- private final Path instanceBasePath;
-
/**
- * Protects access to RocksDB in other threads, like the checkpointing
thread from parallel call
- * that disposes the RocksDB object.
+ * Protects access to ForSt in other threads, like the checkpointing
thread from parallel call
+ * that disposes the ForSt object.
*/
- private final ResourceGuard rocksDBResourceGuard;
+ private final ResourceGuard forstResourceGuard;
/** The write options to use in the states. We disable write ahead
logging. */
private final WriteOptions writeOptions;
@@ -228,7 +222,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
*/
private final ColumnFamilyHandle defaultColumnFamily;
- /** Shared wrapper for batch writes to the RocksDB instance. */
+ /** Shared wrapper for batch writes to the ForSt instance. */
private final ForStDBWriteBatchWrapper writeBatchWrapper;
/**
@@ -244,14 +238,14 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
private final PriorityQueueSetFactory priorityQueueFactory;
/**
- * Helper to build the byte arrays of composite keys to address data in
RocksDB. Shared across
- * all states.
+ * Helper to build the byte arrays of composite keys to address data in
forst. Shared across all
+ * states.
*/
private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
/**
- * Our RocksDB database, this is used by the actual subclasses of {@link
AbstractForStSyncState}
- * to store state. The different k/v states that we have don't each have
their own RocksDB
+ * Our ForSt database, this is used by the actual subclasses of {@link
AbstractForStSyncState}
+ * to store state. The different k/v states that we have don't each have
their own ForSt
* instance. They all write to this instance but to their own column
family.
*/
protected final RocksDB db;
@@ -263,7 +257,6 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
public ForStSyncKeyedStateBackend(
ClassLoader userCodeClassLoader,
- Path instanceBasePath,
ForStResourceContainer optionsContainer,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
TaskKvStateRegistry kvStateRegistry,
@@ -277,7 +270,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
StreamCompressionDecorator keyGroupCompressionDecorator,
- ResourceGuard rocksDBResourceGuard,
+ ResourceGuard forstResourceGuard,
ForStSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy,
ForStDBWriteBatchWrapper writeBatchWrapper,
ColumnFamilyHandle defaultColumnFamilyHandle,
@@ -307,8 +300,6 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
- this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
-
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;
this.createdKVStates = new HashMap<>();
@@ -317,7 +308,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
this.readOptions = optionsContainer.getReadOptions();
this.writeBatchSize = writeBatchSize;
this.db = db;
- this.rocksDBResourceGuard = rocksDBResourceGuard;
+ this.forstResourceGuard = forstResourceGuard;
this.checkpointSnapshotStrategy = checkpointSnapshotStrategy;
this.writeBatchWrapper = writeBatchWrapper;
this.defaultColumnFamily = defaultColumnFamilyHandle;
@@ -360,7 +351,8 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
namespace, namespaceSerializer, namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
- throw new FlinkRuntimeException("Failed to get keys from RocksDB
state backend.", ex);
+ throw new FlinkRuntimeException(
+ "Failed to get keys from ForSt sync state backend.", ex);
}
ForStIteratorWrapper iterator =
@@ -398,6 +390,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
return Stream.empty();
}
+ @SuppressWarnings("unchecked")
RegisteredKeyValueStateBackendMetaInfo<N, ?>
registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>)
columnInfo.metaInfo;
@@ -428,12 +421,6 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
return targetStream.onClose(iteratorWrapper::close);
}
- @VisibleForTesting
- ColumnFamilyHandle getColumnFamilyHandle(String state) {
- ForStOperationUtils.ForStKvStateInfo columnInfo =
kvStateInformation.get(state);
- return columnInfo != null ? columnInfo.columnFamilyHandle : null;
- }
-
@Override
public void setCurrentKey(K newKey) {
super.setCurrentKey(newKey);
@@ -454,11 +441,11 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
}
super.dispose();
- // This call will block until all clients that still acquire access to
the RocksDB instance
+ // This call will block until all clients that still acquire access to
the ForSt instance
// have released it,
// so that we cannot release the native resources while clients are
still working with it in
// parallel.
- rocksDBResourceGuard.close();
+ forstResourceGuard.close();
// IMPORTANT: null reference to signal potential async checkpoint
workers that the db was
// disposed, as
@@ -467,7 +454,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
IOUtils.closeQuietly(writeBatchWrapper);
// Metric collection occurs on a background thread. When this
method returns
- // it is guaranteed that thr RocksDB reference has been invalidated
+ // it is guaranteed that thr ForSt reference has been invalidated
// and no more metric collection will be attempted against the
database.
if (nativeMetricMonitor != null) {
nativeMetricMonitor.close();
@@ -497,11 +484,24 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
columnFamilyOptions.forEach(IOUtils::closeQuietly);
+ LOG.info(
+ "Closed ForSt State Backend. Cleaning up ForSt local
working directory {}, remote working directory {}.",
+ optionsContainer.getLocalBasePath(),
+ optionsContainer.getRemoteBasePath());
+
+ try {
+ optionsContainer.clearDirectories();
+ } catch (Exception ex) {
+ LOG.warn(
+ "Could not delete ForSt local working directory {},
remote working directory {}.",
+ optionsContainer.getLocalBasePath(),
+ optionsContainer.getRemoteBasePath(),
+ ex);
+ }
+
IOUtils.closeQuietly(optionsContainer);
kvStateInformation.clear();
-
- cleanInstanceBasePath();
}
IOUtils.closeQuietly(checkpointSnapshotStrategy);
this.disposed = true;
@@ -531,18 +531,6 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
}
}
- private void cleanInstanceBasePath() {
- LOG.info(
- "Closed RocksDB State Backend. Cleaning up RocksDB working
directory {}.",
- instanceBasePath);
-
- try {
- FileUtils.deleteDirectory(new File(instanceBasePath.getPath()));
- } catch (IOException ex) {
- LOG.warn("Could not delete RocksDB working directory: {}",
instanceBasePath, ex);
- }
- }
-
// ------------------------------------------------------------------------
// Getters and Setters
// ------------------------------------------------------------------------
@@ -568,15 +556,10 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
return sharedRocksKeyBuilder;
}
- @VisibleForTesting
- boolean isDisposed() {
- return this.disposed;
- }
-
/**
- * Triggers an asynchronous snapshot of the keyed state backend from
RocksDB. This snapshot can
- * be canceled and is also stopped when the backend is closed through
{@link #dispose()}. For
- * each backend, this method must always be called by the same thread.
+ * Triggers an asynchronous snapshot of the keyed state backend from
ForSt. This snapshot can be
+ * canceled and is also stopped when the backend is closed through {@link
#dispose()}. For each
+ * backend, this method must always be called by the same thread.
*
* @param checkpointId The Id of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
@@ -627,11 +610,11 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
}
/**
- * Registers a k/v state information, which includes its state id, type,
RocksDB column family
+ * Registers a k/v state information, which includes its state id, type,
ForSt column family
* handle, and serializers.
*
* <p>When restoring from a snapshot, we don’t restore the individual k/v
states, just the
- * global RocksDB database and the list of k/v state information. When a
k/v state is first
+ * global ForSt database and the list of k/v state information. When a k/v
state is first
* requested we check here whether we already have a registered entry for
that and return it
* (after some necessary state compatibility checks) or create a new one
if it does not exist.
*/
@@ -757,7 +740,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
}
/**
- * Migrate only the state value, that is the "value" that is stored in
RocksDB. We don't migrate
+ * Migrate only the state value, that is the "value" that is stored in
ForSt. We don't migrate
* the key here, which is made up of key group, key, namespace and map key
(in case of
* MapState).
*/
@@ -805,9 +788,9 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
}
@SuppressWarnings("unchecked")
- AbstractForStSyncState<?, ?, SV> rocksDBState =
(AbstractForStSyncState<?, ?, SV>) state;
+ AbstractForStSyncState<?, ?, SV> forStState =
(AbstractForStSyncState<?, ?, SV>) state;
- Snapshot rocksDBSnapshot = db.getSnapshot();
+ Snapshot forstSnapshot = db.getSnapshot();
try (ForStIteratorWrapper iterator =
ForStOperationUtils.getForStIterator(db,
stateMetaInfo.f0, readOptions);
ForStDBWriteBatchWrapper batchWriter =
@@ -822,7 +805,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
while (iterator.isValid()) {
serializedValueInput.setBuffer(iterator.value());
- rocksDBState.migrateSerializedValue(
+ forStState.migrateSerializedValue(
serializedValueInput,
migratedSerializedValueOutput,
stateMetaInfo.f1.getPreviousStateSerializer(),
@@ -837,8 +820,8 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
iterator.next();
}
} finally {
- db.releaseSnapshot(rocksDBSnapshot);
- rocksDBSnapshot.close();
+ db.releaseSnapshot(forstSnapshot);
+ forstSnapshot.close();
}
}
@@ -925,11 +908,6 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
"State %s is not supported by %s", stateDesc.getClass(),
this.getClass());
}
- /** Only visible for testing, DO NOT USE. */
- Path getInstanceBasePath() {
- return instanceBasePath;
- }
-
@VisibleForTesting
@Override
public int numKeyValueStateEntries() {
@@ -967,4 +945,9 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
long getWriteBatchSize() {
return writeBatchSize;
}
+
+ @VisibleForTesting
+ public ForStResourceContainer getOptionsContainer() {
+ return optionsContainer;
+ }
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 54ced97fa22..64ea50ab850 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -104,12 +104,10 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
*/
public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBuilder<K> {
- private static final String DB_INSTANCE_DIR_STRING = "db";
-
/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;
- /** The configuration of rocksDB priorityQueue state. */
+ /** The configuration of ForSt priorityQueue state. */
private final ForStPriorityQueueConfig priorityQueueConfig;
/** The configuration of local recovery. */
@@ -118,22 +116,16 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
/** Factory function to create column family options from state name. */
private final Function<String, ColumnFamilyOptions>
columnFamilyOptionsFactory;
- /** The container of RocksDB option factory and predefined options. */
+ /** The container of ForSt option factory and predefined options. */
private final ForStResourceContainer optionsContainer;
- /** Path where this configured instance stores its data directory. */
- private final Path instanceBasePath;
-
- /** Path where this configured instance stores its RocksDB database. */
- private final Path instanceForStDBPath;
-
private final MetricGroup metricGroup;
private final StateBackend.CustomInitializationMetrics
customInitializationMetrics;
/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
- /** RocksDB property-based and statistics-based native metrics options. */
+ /** ForSt property-based and statistics-based native metrics options. */
private ForStNativeMetricOptions nativeMetricOptions;
private long writeBatchSize =
@@ -154,7 +146,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
public ForStSyncKeyedStateBackendBuilder(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
- Path instanceBasePath,
ForStResourceContainer optionsContainer,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
TaskKvStateRegistry kvStateRegistry,
@@ -190,8 +181,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
// ensure that we use the right merge operator, because other code
relies on this
this.columnFamilyOptionsFactory =
Preconditions.checkNotNull(columnFamilyOptionsFactory);
this.optionsContainer = optionsContainer;
- this.instanceBasePath = instanceBasePath;
- this.instanceForStDBPath = getInstanceRocksDBPath(instanceBasePath);
this.metricGroup = metricGroup;
this.customInitializationMetrics = customInitializationMetrics;
this.enableIncrementalCheckpointing = false;
@@ -203,7 +192,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
ForStSyncKeyedStateBackendBuilder(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
- Path instanceBasePath,
ForStResourceContainer optionsContainer,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
TaskKvStateRegistry kvStateRegistry,
@@ -212,7 +200,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
LocalRecoveryConfig localRecoveryConfig,
- ForStPriorityQueueConfig rocksDBPriorityQueueConfig,
+ ForStPriorityQueueConfig forStPriorityQueueConfig,
TtlTimeProvider ttlTimeProvider,
LatencyTrackingStateConfig latencyTrackingStateConfig,
MetricGroup metricGroup,
@@ -224,7 +212,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
this(
operatorIdentifier,
userCodeClassLoader,
- instanceBasePath,
optionsContainer,
columnFamilyOptionsFactory,
kvStateRegistry,
@@ -233,7 +220,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
keyGroupRange,
executionConfig,
localRecoveryConfig,
- rocksDBPriorityQueueConfig,
+ forStPriorityQueueConfig,
ttlTimeProvider,
latencyTrackingStateConfig,
metricGroup,
@@ -263,10 +250,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
return this;
}
- public static Path getInstanceRocksDBPath(Path instanceBasePath) {
- return new Path(instanceBasePath, DB_INSTANCE_DIR_STRING);
- }
-
private static void checkAndCreateDirectory(File directory) throws
IOException {
if (directory.exists()) {
if (!directory.isDirectory()) {
@@ -274,7 +257,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
}
} else if (!directory.mkdirs()) {
throw new IOException(
- String.format("Could not create RocksDB data directory at
%s.", directory));
+ String.format("Could not create ForSt data directory at
%s.", directory));
}
}
@@ -300,7 +283,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
ForStSnapshotStrategyBase<K, ?> checkpointStrategy = null;
- ResourceGuard rocksDBResourceGuard = new ResourceGuard();
+ ResourceGuard forStResourceGuard = new ResourceGuard();
PriorityQueueSetFactory priorityQueueFactory;
SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
// Number of bytes required to prefix the key groups.
@@ -316,7 +299,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
UUID backendUID = UUID.randomUUID();
SortedMap<Long, Collection<HandleAndLocalPath>>
materializedSstFiles = new TreeMap<>();
long lastCompletedCheckpointId = -1L;
- prepareDirectories();
+ optionsContainer.prepareDirectories();
restoreOperation =
getForStDBRestoreOperation(
keyGroupPrefixBytes,
@@ -346,7 +329,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
checkpointStrategy =
initializeSnapshotStrategy(
db,
- rocksDBResourceGuard,
+ forStResourceGuard,
keySerializerProvider.currentSchemaSerializer(),
kvStateInformation,
keyGroupRange,
@@ -369,7 +352,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
new ArrayList<>(kvStateInformation.values().size());
IOUtils.closeQuietly(cancelRegistryForBackend);
IOUtils.closeQuietly(writeBatchWrapper);
- IOUtils.closeQuietly(rocksDBResourceGuard);
+ IOUtils.closeQuietly(forStResourceGuard);
ForStOperationUtils.addColumnFamilyOptionsToCloseLater(
columnFamilyOptions, defaultColumnFamilyHandle);
IOUtils.closeQuietly(defaultColumnFamilyHandle);
@@ -388,9 +371,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
kvStateInformation.clear();
try {
- FileUtils.deleteDirectory(new
File(instanceBasePath.getPath()));
+ FileUtils.deleteDirectory(new
File(optionsContainer.getBasePath().getPath()));
} catch (Exception ex) {
- logger.warn("Failed to delete base path for RocksDB: " +
instanceBasePath, ex);
+ logger.warn(
+ "Failed to delete base path for ForSt: " +
optionsContainer.getBasePath(),
+ ex);
}
// Log and rethrow
if (e instanceof BackendBuildingException) {
@@ -403,10 +388,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
}
InternalKeyContext<K> keyContext =
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
- logger.info("Finished building RocksDB keyed state-backend at {}.",
instanceBasePath);
+ logger.info(
+ "Finished building ForSt keyed state-backend at {}.",
+ optionsContainer.getBasePath());
return new ForStSyncKeyedStateBackend<>(
this.userCodeClassLoader,
- this.instanceBasePath,
this.optionsContainer,
columnFamilyOptionsFactory,
this.kvStateRegistry,
@@ -420,7 +406,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
keyGroupPrefixBytes,
cancelRegistryForBackend,
this.keyGroupCompressionDecorator,
- rocksDBResourceGuard,
+ forStResourceGuard,
checkpointStrategy,
writeBatchWrapper,
defaultColumnFamilyHandle,
@@ -513,11 +499,15 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
// env. We expect to directly use the dfs directory in flink env or
local directory as
// working dir. We will implement this in ForStDB later, but before
that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
+ Path instanceForStPath =
+ optionsContainer.getRemoteForStPath() == null
+ ? optionsContainer.getLocalForStPath()
+ : new Path("/db");
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
return new ForStNoneRestoreOperation(
Collections.emptyMap(),
- instanceForStDBPath,
+ instanceForStPath,
optionsContainer.getDbOptions(),
columnFamilyOptionsFactory,
nativeMetricOptions,
@@ -538,7 +528,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
keySerializerProvider,
optionsContainer,
optionsContainer.getBasePath(),
- instanceForStDBPath,
+ instanceForStPath,
optionsContainer.getDbOptions(),
columnFamilyOptionsFactory,
nativeMetricOptions,
@@ -565,7 +555,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
registeredPQStates,
createHeapQueueFactory(),
keySerializerProvider,
- instanceForStDBPath,
+ instanceForStPath,
optionsContainer.getDbOptions(),
columnFamilyOptionsFactory,
nativeMetricOptions,
@@ -605,7 +595,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
nativeMetricMonitor,
columnFamilyOptionsFactory,
optionsContainer.getWriteBufferManagerCapacity(),
-
priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize());
+
priorityQueueConfig.getForStDBPriorityQueueSetCacheSize());
break;
default:
throw new IllegalArgumentException(
@@ -618,14 +608,4 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
private HeapPriorityQueueSetFactory createHeapQueueFactory() {
return new HeapPriorityQueueSetFactory(keyGroupRange,
numberOfKeyGroups, 128);
}
-
- private void prepareDirectories() throws IOException {
- File baseFile = new File(instanceBasePath.getPath());
- checkAndCreateDirectory(baseFile);
- if (new File(instanceForStDBPath.getPath()).exists()) {
- // Clear the base directory when the backend is created
- // in case something crashed and the backend never reached
dispose()
- FileUtils.deleteDirectory(baseFile);
- }
- }
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index 4c4d853d70a..7c4e58e1ce6 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -18,26 +18,35 @@
package org.apache.flink.state.forst;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.function.SupplierWithException;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
@@ -112,4 +121,36 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
backend = backend.configure(config,
Thread.currentThread().getContextClassLoader());
assertThat(backend.isIncrementalCheckpointsEnabled()).isTrue();
}
+
+ @TestTemplate
+ void testCreateKeyedStateBackend() throws Exception {
+ Assumptions.assumeFalse(
+ getCheckpointStorage() instanceof JobManagerCheckpointStorage,
+ "Skip JM checkpoint storage");
+ ForStStateBackend backend = new ForStStateBackend();
+ ForStSyncKeyedStateBackend keyedStateBackend1 =
+ (ForStSyncKeyedStateBackend)
createKeyedBackend(IntSerializer.INSTANCE);
+
assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull();
+ Configuration config = new Configuration();
+ config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false);
+ backend = backend.configure(config,
Thread.currentThread().getContextClassLoader());
+ ForStSyncKeyedStateBackend keyedStateBackend2 =
+ (ForStSyncKeyedStateBackend)
+ backend.createKeyedStateBackend(
+ new KeyedStateBackendParametersImpl<>(
+ env,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 10,
+ KeyGroupRange.of(0, 9),
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT,
+ getMetricGroup(),
+ getCustomInitializationMetrics(),
+ Collections.emptyList(),
+ new CloseableRegistry(),
+ 1.0d));
+
assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull();
+ }
}