This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 162025a9e0d [FLINK-38558] Require copy when use heap timer (#27160)
162025a9e0d is described below
commit 162025a9e0d569e821ffde47722b56efe3d76416
Author: Zakelly <[email protected]>
AuthorDate: Fri Oct 31 20:44:19 2025 +0800
[FLINK-38558] Require copy when use heap timer (#27160)
---
...logDelegateEmbeddedRocksDBStateBackendTest.java | 10 ++++-----
.../forst/sync/ForStSyncKeyedStateBackend.java | 2 +-
.../flink/state/forst/ForStStateBackendTest.java | 20 +++++++++++++----
.../state/rocksdb/RocksDBKeyedStateBackend.java | 2 +-
.../rocksdb/EmbeddedRocksDBStateBackendTest.java | 26 +++++++++++++++++-----
5 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index 168ca6e1052..1f94beb33f8 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -41,6 +41,8 @@ import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+
/** Tests for {@link ChangelogStateBackend} delegating {@link
EmbeddedRocksDBStateBackend}. */
public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
extends EmbeddedRocksDBStateBackendTest {
@@ -62,11 +64,6 @@ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
return false;
}
- @Override
- protected boolean isSafeToReuseKVState() {
- return true;
- }
-
@TestTemplate
@Disabled("The type of handle returned from snapshot() is not incremental")
public void testSharedIncrementalStateDeRegistration() {}
@@ -118,6 +115,9 @@ public class
ChangelogDelegateEmbeddedRocksDBStateBackendTest
@TestTemplate
public void testMaterializedRestorePriorityQueue() throws Exception {
+ assumeFalse(
+ useHeapTimer,
+ "Heap priority queue does not support restore test on managed
keyed state");
CheckpointStreamFactory streamFactory = createStreamFactory();
ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index f1aa35d0300..4f410ca4287 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -938,7 +938,7 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
@Override
public boolean isSafeToReuseKVState() {
- return true;
+ return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
}
@VisibleForTesting
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index b3d8bc8e364..08945e75567 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -51,6 +51,7 @@ import java.util.List;
import static
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
+import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
@@ -64,7 +65,8 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
new Object[][] {
{
(SupplierWithException<CheckpointStorage, IOException>)
- JobManagerCheckpointStorage::new
+ JobManagerCheckpointStorage::new,
+ false
},
{
(SupplierWithException<CheckpointStorage, IOException>)
@@ -73,12 +75,17 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
TempDirUtils.newFolder(tempFolder).toURI().toString();
return new FileSystemCheckpointStorage(
new Path(checkpointPath), 0, -1);
- }
+ },
+ true
}
});
}
- @Parameter public SupplierWithException<CheckpointStorage, IOException>
storageSupplier;
+ @Parameter(value = 0)
+ public SupplierWithException<CheckpointStorage, IOException>
storageSupplier;
+
+ @Parameter(value = 1)
+ public boolean useHeapTimer;
@Override
protected CheckpointStorage getCheckpointStorage() throws Exception {
@@ -91,6 +98,11 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
Configuration config = new Configuration();
config.set(LOCAL_DIRECTORIES, tempFolder.toString());
config.set(USE_INGEST_DB_RESTORE_MODE, true);
+ config.set(
+ TIMER_SERVICE_FACTORY,
+ useHeapTimer
+ ? ForStStateBackend.PriorityQueueStateType.HEAP
+ : ForStStateBackend.PriorityQueueStateType.ForStDB);
return backend.configure(config,
Thread.currentThread().getContextClassLoader());
}
@@ -109,7 +121,7 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
*/
@Override
protected boolean isSafeToReuseKVState() {
- return true;
+ return !useHeapTimer;
}
@TestTemplate
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index 0cf13b3384b..b77232cb067 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -1108,7 +1108,7 @@ public class RocksDBKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
@Override
public boolean isSafeToReuseKVState() {
- return true;
+ return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
index b188f9afae5..6e40503c2ad 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java
@@ -99,6 +99,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
import static
org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
+import static
org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,13 +132,15 @@ public class EmbeddedRocksDBStateBackendTest
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new,
+ false,
false
},
{
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new,
- true
+ true,
+ false
},
{
false,
@@ -148,8 +151,16 @@ public class EmbeddedRocksDBStateBackendTest
return new FileSystemCheckpointStorage(
new Path(checkpointPath), 0, -1);
},
+ false,
false
- }
+ },
+ {
+ true,
+ (SupplierWithException<CheckpointStorage, IOException>)
+ JobManagerCheckpointStorage::new,
+ false,
+ true
+ },
});
}
@@ -162,6 +173,9 @@ public class EmbeddedRocksDBStateBackendTest
@Parameter(value = 2)
public boolean useIngestDB;
+ @Parameter(value = 3)
+ public boolean useHeapTimer;
+
// Store it because we need it for the cleanup test.
private String dbPath;
private RocksDB db = null;
@@ -203,8 +217,10 @@ public class EmbeddedRocksDBStateBackendTest
Configuration configuration = new Configuration();
configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
configuration.set(
- RocksDBOptions.TIMER_SERVICE_FACTORY,
- EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+ TIMER_SERVICE_FACTORY,
+ useHeapTimer
+ ?
EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP
+ :
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL,
Duration.ofMillis(1));
return configuration;
}
@@ -234,7 +250,7 @@ public class EmbeddedRocksDBStateBackendTest
@Override
protected boolean isSafeToReuseKVState() {
- return true;
+ return !useHeapTimer;
}
// small safety net for instance cleanups, so that no native objects are
left