This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d567a9dee4d [FLINK-38462] Introduce state backend type identifier
d567a9dee4d is described below
commit d567a9dee4d54f0401058e7c07081cf05adbd2c1
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 16:33:19 2025 +0200
[FLINK-38462] Introduce state backend type identifier
---
.../org/apache/flink/api/common/state/KeyedStateStore.java | 7 +++++++
.../test/java/org/apache/flink/cep/utils/TestSharedBuffer.java | 5 +++++
.../flink/state/api/input/MultiStateKeyIteratorTest.java | 5 +++++
.../org/apache/flink/runtime/state/AsyncKeyedStateBackend.java | 8 ++++++++
.../org/apache/flink/runtime/state/DefaultKeyedStateStore.java | 9 +++++++++
.../java/org/apache/flink/runtime/state/KeyedStateBackend.java | 8 ++++++++
.../apache/flink/runtime/state/heap/HeapKeyedStateBackend.java | 6 ++++++
.../state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java | 5 +++++
.../sorted/state/BatchExecutionKeyedStateBackend.java | 5 +++++
.../org/apache/flink/runtime/state/StateBackendTestUtils.java | 10 ++++++++++
.../flink/runtime/state/ttl/mock/MockKeyedStateBackend.java | 5 +++++
.../flink/runtime/state/v2/AbstractKeyedStateTestBase.java | 5 +++++
.../apache/flink/streaming/runtime/tasks/TestStateBackend.java | 5 +++++
.../flink/state/changelog/ChangelogKeyedStateBackend.java | 5 +++++
.../changelog/restore/ChangelogMigrationRestoreTarget.java | 5 +++++
.../org/apache/flink/state/forst/ForStKeyedStateBackend.java | 6 ++++++
.../flink/state/forst/sync/ForStSyncKeyedStateBackend.java | 6 ++++++
.../apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java | 6 ++++++
18 files changed, 111 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 69c513c388a..a9f6dbda223 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -338,4 +338,11 @@ public interface KeyedStateStore {
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
IN, ACC, OUT>
stateProperties);
+
+ /**
+ * @return fixed lower-case string identifying the type of the underlying
state backend, e.g.
+ * rocksdb, hashmap, forst, batch.
+ */
+ @Experimental
+ String getBackendTypeIdentifier();
}
diff --git
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 1676c3de5c4..a741ad07512 100644
---
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -287,6 +287,11 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "mock";
+ }
+
private class CountingIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
index 2045cc81e66..260df82dfa4 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
@@ -324,6 +324,11 @@ public class MultiStateKeyIteratorTest {
"Operations other than getKeys() are not supported on this
testing StateBackend.");
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "test";
+ }
+
@Nonnull
@Override
public SavepointResources<Integer> savepoint() throws
UnsupportedOperationException {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index 21fdf4b6e2f..c0275314574 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.v2.State;
@@ -140,4 +141,11 @@ public interface AsyncKeyedStateBackend<K>
@Override
void dispose();
+
+ /**
+ * @return fixed lower-case string identifying the type of the underlying
state backend, e.g.
+ * rocksdb, hashmap, or unknown.
+ */
+ @Experimental
+ String getBackendTypeIdentifier();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index f43e6b8ad0a..82cdb849d99 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -219,6 +219,15 @@ public class DefaultKeyedStateStore implements
KeyedStateStore {
}
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ if (keyedStateBackend != null) {
+ return keyedStateBackend.getBackendTypeIdentifier();
+ } else {
+ return asyncKeyedStateBackend.getBackendTypeIdentifier();
+ }
+ }
+
protected <S extends org.apache.flink.api.common.state.v2.State, SV> S
getPartitionedState(
org.apache.flink.api.common.state.v2.StateDescriptor<SV>
stateDescriptor)
throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 19ef61b8ab5..d773fdb4a0a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -166,6 +167,13 @@ public interface KeyedStateBackend<K>
return false;
}
+ /**
+ * @return fixed lower-case string identifying the type of the underlying
state backend, e.g.
+ * rocksdb, hashmap, forst, batch.
+ */
+ @Experimental
+ String getBackendTypeIdentifier();
+
/** Listener is given a callback when {@link #setCurrentKey} is called
(key context changes). */
@FunctionalInterface
interface KeySelectionListener<K> {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index cf8e56dfba7..ddc3405f18c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
@@ -406,6 +407,11 @@ public class HeapKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
return table.getKeysAndNamespaces();
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+ }
+
@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
index 3be4bb6ac7e..dbe88d1a2c7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
@@ -132,6 +132,11 @@ public class AsyncKeyedStateBackendAdaptor<K> implements
AsyncKeyedStateBackend<
@Override
public void dispose() {}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return keyedStateBackend.getBackendTypeIdentifier();
+ }
+
@Override
public void close() throws IOException {}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
index 15705acdcd4..1d7d18553ca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
@@ -229,6 +229,11 @@ public class BatchExecutionKeyedStateBackend<K> implements
CheckpointableKeyedSt
return keySelectionListeners.remove(listener);
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "batch";
+ }
+
@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
index ab9486a3559..8fdaf601bf4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
@@ -171,6 +171,11 @@ public class StateBackendTestUtils {
// do nothing
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "test";
+ }
+
@Override
public void close() {
// do nothing
@@ -328,6 +333,11 @@ public class StateBackendTestUtils {
delegatedKeyedStateBackend.dispose();
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "test";
+ }
+
@Override
public void close() throws IOException {
super.close();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index de3f30d4e79..18cfa86c1ca 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -236,6 +236,11 @@ public class MockKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
(N)
namespace.getKey())));
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "mock";
+ }
+
@Nonnull
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index 965dd7239c0..aafbc453c76 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -213,6 +213,11 @@ public class AbstractKeyedStateTestBase {
public void dispose() {
// do nothing
}
+
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "test";
+ }
};
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
index d63ba3ddbb3..c8ade06c888 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
@@ -176,6 +176,11 @@ public class TestStateBackend extends AbstractStateBackend
{
throw new UnsupportedOperationException();
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return "test";
+ }
+
@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 9cf61fc5a51..4182023900b 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -623,6 +623,11 @@ public class ChangelogKeyedStateBackend<K>
return keyedStateBackend.isSafeToReuseKVState();
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return keyedStateBackend.getBackendTypeIdentifier();
+ }
+
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
index 6ccbb9a79c5..8814e61a81a 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
@@ -299,6 +299,11 @@ public class ChangelogMigrationRestoreTarget<K> implements
ChangelogRestoreTarge
keyedStateBackend.dispose();
changelogStateFactory.dispose();
}
+
+ @Override
+ public String getBackendTypeIdentifier() {
+ return keyedStateBackend.getBackendTypeIdentifier();
+ }
};
}
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 635d29b8c41..f48f63a1636 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
@@ -617,6 +618,11 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
}
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return StateBackendLoader.FORST_STATE_BACKEND_NAME;
+ }
+
@Override
public boolean isSafeToReuseKVState() {
return true;
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 0cd1312cc44..7feaa08eca5 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
import
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
@@ -952,6 +953,11 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
db.compactRange(kvStateInfo.columnFamilyHandle);
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return StateBackendLoader.FORST_STATE_BACKEND_NAME;
+ }
+
@Nonnegative
long getWriteBatchSize() {
return writeBatchSize;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index e52a3876599..0cf13b3384b 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
import
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
@@ -1110,6 +1111,11 @@ public class RocksDBKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
return true;
}
+ @Override
+ public String getBackendTypeIdentifier() {
+ return StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
+ }
+
/** Rocks DB specific information about the k/v states. */
public static class RocksDbKvStateInfo implements AutoCloseable {
public final ColumnFamilyHandle columnFamilyHandle;