This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d567a9dee4d [FLINK-38462] Introduce state backend type identifier
d567a9dee4d is described below

commit d567a9dee4d54f0401058e7c07081cf05adbd2c1
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 16:33:19 2025 +0200

    [FLINK-38462] Introduce state backend type identifier
---
 .../org/apache/flink/api/common/state/KeyedStateStore.java     |  7 +++++++
 .../test/java/org/apache/flink/cep/utils/TestSharedBuffer.java |  5 +++++
 .../flink/state/api/input/MultiStateKeyIteratorTest.java       |  5 +++++
 .../org/apache/flink/runtime/state/AsyncKeyedStateBackend.java |  8 ++++++++
 .../org/apache/flink/runtime/state/DefaultKeyedStateStore.java |  9 +++++++++
 .../java/org/apache/flink/runtime/state/KeyedStateBackend.java |  8 ++++++++
 .../apache/flink/runtime/state/heap/HeapKeyedStateBackend.java |  6 ++++++
 .../state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java        |  5 +++++
 .../sorted/state/BatchExecutionKeyedStateBackend.java          |  5 +++++
 .../org/apache/flink/runtime/state/StateBackendTestUtils.java  | 10 ++++++++++
 .../flink/runtime/state/ttl/mock/MockKeyedStateBackend.java    |  5 +++++
 .../flink/runtime/state/v2/AbstractKeyedStateTestBase.java     |  5 +++++
 .../apache/flink/streaming/runtime/tasks/TestStateBackend.java |  5 +++++
 .../flink/state/changelog/ChangelogKeyedStateBackend.java      |  5 +++++
 .../changelog/restore/ChangelogMigrationRestoreTarget.java     |  5 +++++
 .../org/apache/flink/state/forst/ForStKeyedStateBackend.java   |  6 ++++++
 .../flink/state/forst/sync/ForStSyncKeyedStateBackend.java     |  6 ++++++
 .../apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java   |  6 ++++++
 18 files changed, 111 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 69c513c388a..a9f6dbda223 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -338,4 +338,11 @@ public interface KeyedStateStore {
                             
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
                                             IN, ACC, OUT>
                                     stateProperties);
+
+    /**
+     * @return fixed lower-case string identifying the type of the underlying 
state backend, e.g.
+     *     rocksdb, hashmap, forst, batch.
+     */
+    @Experimental
+    String getBackendTypeIdentifier();
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 1676c3de5c4..a741ad07512 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -287,6 +287,11 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public String getBackendTypeIdentifier() {
+            return "mock";
+        }
+
         private class CountingIterator<T> implements Iterator<T> {
 
             private final Iterator<T> iterator;
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
index 2045cc81e66..260df82dfa4 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java
@@ -324,6 +324,11 @@ public class MultiStateKeyIteratorTest {
                     "Operations other than getKeys() are not supported on this 
testing StateBackend.");
         }
 
+        @Override
+        public String getBackendTypeIdentifier() {
+            return "test";
+        }
+
         @Nonnull
         @Override
         public SavepointResources<Integer> savepoint() throws 
UnsupportedOperationException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index 21fdf4b6e2f..c0275314574 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.InternalCheckpointListener;
 import org.apache.flink.api.common.state.v2.State;
@@ -140,4 +141,11 @@ public interface AsyncKeyedStateBackend<K>
 
     @Override
     void dispose();
+
+    /**
+     * @return fixed lower-case string identifying the type of the underlying 
state backend, e.g.
+     *     rocksdb, hashmap, or unknown.
+     */
+    @Experimental
+    String getBackendTypeIdentifier();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index f43e6b8ad0a..82cdb849d99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -219,6 +219,15 @@ public class DefaultKeyedStateStore implements 
KeyedStateStore {
         }
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        if (keyedStateBackend != null) {
+            return keyedStateBackend.getBackendTypeIdentifier();
+        } else {
+            return asyncKeyedStateBackend.getBackendTypeIdentifier();
+        }
+    }
+
     protected <S extends org.apache.flink.api.common.state.v2.State, SV> S 
getPartitionedState(
             org.apache.flink.api.common.state.v2.StateDescriptor<SV> 
stateDescriptor)
             throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 19ef61b8ab5..d773fdb4a0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -166,6 +167,13 @@ public interface KeyedStateBackend<K>
         return false;
     }
 
+    /**
+     * @return fixed lower-case string identifying the type of the underlying 
state backend, e.g.
+     *     rocksdb, hashmap, forst, batch.
+     */
+    @Experimental
+    String getBackendTypeIdentifier();
+
     /** Listener is given a callback when {@link #setCurrentKey} is called 
(key context changes). */
     @FunctionalInterface
     interface KeySelectionListener<K> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index cf8e56dfba7..ddc3405f18c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.SnapshotExecutionType;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshotRestore;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
@@ -406,6 +407,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         return table.getKeysAndNamespaces();
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+    }
+
     @Override
     @Nonnull
     public <N, SV, SEV, S extends State, IS extends S> IS 
createOrUpdateInternalState(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
index 3be4bb6ac7e..dbe88d1a2c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
@@ -132,6 +132,11 @@ public class AsyncKeyedStateBackendAdaptor<K> implements 
AsyncKeyedStateBackend<
     @Override
     public void dispose() {}
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return keyedStateBackend.getBackendTypeIdentifier();
+    }
+
     @Override
     public void close() throws IOException {}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
index 15705acdcd4..1d7d18553ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
@@ -229,6 +229,11 @@ public class BatchExecutionKeyedStateBackend<K> implements 
CheckpointableKeyedSt
         return keySelectionListeners.remove(listener);
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return "batch";
+    }
+
     @Nonnull
     @Override
     public <N, SV, SEV, S extends State, IS extends S> IS 
createOrUpdateInternalState(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
index ab9486a3559..8fdaf601bf4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
@@ -171,6 +171,11 @@ public class StateBackendTestUtils {
             // do nothing
         }
 
+        @Override
+        public String getBackendTypeIdentifier() {
+            return "test";
+        }
+
         @Override
         public void close() {
             // do nothing
@@ -328,6 +333,11 @@ public class StateBackendTestUtils {
                     delegatedKeyedStateBackend.dispose();
                 }
 
+                @Override
+                public String getBackendTypeIdentifier() {
+                    return "test";
+                }
+
                 @Override
                 public void close() throws IOException {
                     super.close();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index de3f30d4e79..18cfa86c1ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -236,6 +236,11 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                                 (N) 
namespace.getKey())));
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return "mock";
+    }
+
     @Nonnull
     @Override
     public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index 965dd7239c0..aafbc453c76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -213,6 +213,11 @@ public class AbstractKeyedStateTestBase {
                 public void dispose() {
                     // do nothing
                 }
+
+                @Override
+                public String getBackendTypeIdentifier() {
+                    return "test";
+                }
             };
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
index d63ba3ddbb3..c8ade06c888 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java
@@ -176,6 +176,11 @@ public class TestStateBackend extends AbstractStateBackend 
{
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public String getBackendTypeIdentifier() {
+            return "test";
+        }
+
         @Nonnull
         @Override
         public <N, SV, SEV, S extends State, IS extends S> IS 
createOrUpdateInternalState(
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 9cf61fc5a51..4182023900b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -623,6 +623,11 @@ public class ChangelogKeyedStateBackend<K>
         return keyedStateBackend.isSafeToReuseKVState();
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return keyedStateBackend.getBackendTypeIdentifier();
+    }
+
     @Nonnull
     @Override
     public SavepointResources<K> savepoint() throws Exception {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
index 6ccbb9a79c5..8814e61a81a 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
@@ -299,6 +299,11 @@ public class ChangelogMigrationRestoreTarget<K> implements 
ChangelogRestoreTarge
                 keyedStateBackend.dispose();
                 changelogStateFactory.dispose();
             }
+
+            @Override
+            public String getBackendTypeIdentifier() {
+                return keyedStateBackend.getBackendTypeIdentifier();
+            }
         };
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 635d29b8c41..f48f63a1636 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
@@ -617,6 +618,11 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
         }
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return StateBackendLoader.FORST_STATE_BACKEND_NAME;
+    }
+
     @Override
     public boolean isSafeToReuseKVState() {
         return true;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 0cd1312cc44..7feaa08eca5 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
@@ -952,6 +953,11 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         db.compactRange(kvStateInfo.columnFamilyHandle);
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return StateBackendLoader.FORST_STATE_BACKEND_NAME;
+    }
+
     @Nonnegative
     long getWriteBatchSize() {
         return writeBatchSize;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index e52a3876599..0cf13b3384b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
@@ -1110,6 +1111,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         return true;
     }
 
+    @Override
+    public String getBackendTypeIdentifier() {
+        return StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
+    }
+
     /** Rocks DB specific information about the k/v states. */
     public static class RocksDbKvStateInfo implements AutoCloseable {
         public final ColumnFamilyHandle columnFamilyHandle;

Reply via email to