This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 162025a9e0d [FLINK-38558] Require copy when use heap timer (#27160)
162025a9e0d is described below

commit 162025a9e0d569e821ffde47722b56efe3d76416
Author: Zakelly <[email protected]>
AuthorDate: Fri Oct 31 20:44:19 2025 +0800

    [FLINK-38558] Require copy when use heap timer (#27160)
---
 ...logDelegateEmbeddedRocksDBStateBackendTest.java | 10 ++++-----
 .../forst/sync/ForStSyncKeyedStateBackend.java     |  2 +-
 .../flink/state/forst/ForStStateBackendTest.java   | 20 +++++++++++++----
 .../state/rocksdb/RocksDBKeyedStateBackend.java    |  2 +-
 .../rocksdb/EmbeddedRocksDBStateBackendTest.java   | 26 +++++++++++++++++-----
 5 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index 168ca6e1052..1f94beb33f8 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -41,6 +41,8 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
 
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+
 /** Tests for {@link ChangelogStateBackend} delegating {@link 
EmbeddedRocksDBStateBackend}. */
 public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
         extends EmbeddedRocksDBStateBackendTest {
@@ -62,11 +64,6 @@ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
         return false;
     }
 
-    @Override
-    protected boolean isSafeToReuseKVState() {
-        return true;
-    }
-
     @TestTemplate
     @Disabled("The type of handle returned from snapshot() is not incremental")
     public void testSharedIncrementalStateDeRegistration() {}
@@ -118,6 +115,9 @@ public class 
ChangelogDelegateEmbeddedRocksDBStateBackendTest
 
     @TestTemplate
     public void testMaterializedRestorePriorityQueue() throws Exception {
+        assumeFalse(
+                useHeapTimer,
+                "Heap priority queue does not support restore test on managed 
keyed state");
         CheckpointStreamFactory streamFactory = createStreamFactory();
 
         ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index f1aa35d0300..4f410ca4287 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -938,7 +938,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
 
     @Override
     public boolean isSafeToReuseKVState() {
-        return true;
+        return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
     }
 
     @VisibleForTesting
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index b3d8bc8e364..08945e75567 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
+import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
@@ -64,7 +65,8 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
                 new Object[][] {
                     {
                         (SupplierWithException<CheckpointStorage, IOException>)
-                                JobManagerCheckpointStorage::new
+                                JobManagerCheckpointStorage::new,
+                        false
                     },
                     {
                         (SupplierWithException<CheckpointStorage, IOException>)
@@ -73,12 +75,17 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
                                             
TempDirUtils.newFolder(tempFolder).toURI().toString();
                                     return new FileSystemCheckpointStorage(
                                             new Path(checkpointPath), 0, -1);
-                                }
+                                },
+                        true
                     }
                 });
     }
 
-    @Parameter public SupplierWithException<CheckpointStorage, IOException> 
storageSupplier;
+    @Parameter(value = 0)
+    public SupplierWithException<CheckpointStorage, IOException> 
storageSupplier;
+
+    @Parameter(value = 1)
+    public boolean useHeapTimer;
 
     @Override
     protected CheckpointStorage getCheckpointStorage() throws Exception {
@@ -91,6 +98,11 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
         Configuration config = new Configuration();
         config.set(LOCAL_DIRECTORIES, tempFolder.toString());
         config.set(USE_INGEST_DB_RESTORE_MODE, true);
+        config.set(
+                TIMER_SERVICE_FACTORY,
+                useHeapTimer
+                        ? ForStStateBackend.PriorityQueueStateType.HEAP
+                        : ForStStateBackend.PriorityQueueStateType.ForStDB);
         return backend.configure(config, 
Thread.currentThread().getContextClassLoader());
     }
 
@@ -109,7 +121,7 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
      */
     @Override
     protected boolean isSafeToReuseKVState() {
-        return true;
+        return !useHeapTimer;
     }
 
     @TestTemplate
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index 0cf13b3384b..b77232cb067 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -1108,7 +1108,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
     @Override
     public boolean isSafeToReuseKVState() {
-        return true;
+        return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
index b188f9afae5..6e40503c2ad 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
@@ -99,6 +99,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
 import static 
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
+import static 
org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,13 +132,15 @@ public class EmbeddedRocksDBStateBackendTest
                         true,
                         (SupplierWithException<CheckpointStorage, IOException>)
                                 JobManagerCheckpointStorage::new,
+                        false,
                         false
                     },
                     {
                         true,
                         (SupplierWithException<CheckpointStorage, IOException>)
                                 JobManagerCheckpointStorage::new,
-                        true
+                        true,
+                        false
                     },
                     {
                         false,
@@ -148,8 +151,16 @@ public class EmbeddedRocksDBStateBackendTest
                                     return new FileSystemCheckpointStorage(
                                             new Path(checkpointPath), 0, -1);
                                 },
+                        false,
                         false
-                    }
+                    },
+                    {
+                        true,
+                        (SupplierWithException<CheckpointStorage, IOException>)
+                                JobManagerCheckpointStorage::new,
+                        false,
+                        true
+                    },
                 });
     }
 
@@ -162,6 +173,9 @@ public class EmbeddedRocksDBStateBackendTest
     @Parameter(value = 2)
     public boolean useIngestDB;
 
+    @Parameter(value = 3)
+    public boolean useHeapTimer;
+
     // Store it because we need it for the cleanup test.
     private String dbPath;
     private RocksDB db = null;
@@ -203,8 +217,10 @@ public class EmbeddedRocksDBStateBackendTest
         Configuration configuration = new Configuration();
         configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
         configuration.set(
-                RocksDBOptions.TIMER_SERVICE_FACTORY,
-                EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+                TIMER_SERVICE_FACTORY,
+                useHeapTimer
+                        ? 
EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP
+                        : 
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
         configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, 
Duration.ofMillis(1));
         return configuration;
     }
@@ -234,7 +250,7 @@ public class EmbeddedRocksDBStateBackendTest
 
     @Override
     protected boolean isSafeToReuseKVState() {
-        return true;
+        return !useHeapTimer;
     }
 
     // small safety net for instance cleanups, so that no native objects are 
left

Reply via email to