This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ecc05305f2b352a91135e8962713657982eafadc
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Oct 22 21:31:53 2025 +0200

    [FLINK-35556] Harden RocksDBSharedResourcesFactoryTest
---
 .../state/RocksDBSharedResourcesFactory.java       |  11 ++
 .../state/RocksDBSharedResourcesFactoryTest.java   | 182 ++++++++++++++-------
 2 files changed, 136 insertions(+), 57 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java
index 002d634e7c0..cac17d66cf3 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
@@ -97,6 +98,16 @@ enum RocksDBSharedResourcesFactory {
         this.shareScope = shareScope;
     }
 
+    @VisibleForTesting
+    public boolean isManaged() {
+        return managed;
+    }
+
+    @VisibleForTesting
+    public MemoryShareScope getShareScope() {
+        return shareScope;
+    }
+
     @Nullable
     public static RocksDBSharedResourcesFactory from(
             RocksDBMemoryConfiguration jobMemoryConfig, Environment env) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactoryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactoryTest.java
index 53249b5db90..f19abce6631 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactoryTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -28,9 +29,6 @@ import 
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,11 +36,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
-import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded;
+import static org.apache.flink.contrib.streaming.state.MemoryShareScope.SLOT;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.FIX_PER_TM_MEMORY_SIZE;
@@ -55,13 +53,18 @@ import static 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.re
 import static org.apache.flink.util.CollectionUtil.entry;
 import static org.apache.flink.util.CollectionUtil.map;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.junit.jupiter.api.Assertions.assertSame;
 
 /** {@link RocksDBSharedResourcesFactory} test. */
+@SuppressWarnings({"rawtypes", "DataFlowIssue", "SameParameterValue"})
 public class RocksDBSharedResourcesFactoryTest {
     private static final Logger LOG =
             LoggerFactory.getLogger(RocksDBSharedResourcesFactoryTest.class);
 
+    private static final MemorySize TM_SIZE = new MemorySize(20);
+    private static final MemorySize MANAGED_MEMORY_SIZE = new MemorySize(15);
+    public static final MemorySize PER_SLOT = new MemorySize(10);
+
     @TempDir static Path tempDir;
 
     @BeforeAll
@@ -69,52 +72,100 @@ public class RocksDBSharedResourcesFactoryTest {
         ensureRocksDBIsLoaded(tempDir.toAbsolutePath().toString());
     }
 
-    private static Stream<Arguments> getSelectionStrategyParams() {
-        Map<Object, Object> defaults = emptyMap();
-
-        // format: job options, tm options, expected factory type
-        return Stream.of(
-                // default: per slot, managed
-                arguments(defaults, defaults, SLOT_SHARED_MANAGED),
-                // no sharing (allocate per column family), unmanaged
-                arguments(singletonMap(USE_MANAGED_MEMORY, false), defaults, 
null),
-                // prioritize managed (set explicitly)
-                arguments(
-                        singletonMap(USE_MANAGED_MEMORY, true),
-                        singletonMap(FIX_PER_TM_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        SLOT_SHARED_MANAGED),
-                // prioritize managed (job default)
-                arguments(
-                        defaults,
-                        singletonMap(FIX_PER_TM_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        SLOT_SHARED_MANAGED),
-                // prioritize fixed-per-slot over fixed-per-tm
-                arguments(
-                        singletonMap(FIX_PER_SLOT_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        singletonMap(FIX_PER_TM_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        SLOT_SHARED_UNMANAGED),
-                // prioritize fixed-per-slot over managed
-                arguments(
-                        map(
-                                entry(FIX_PER_SLOT_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                                entry(USE_MANAGED_MEMORY, true)),
-                        singletonMap(FIX_PER_TM_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        SLOT_SHARED_UNMANAGED),
-                // use fixed-per-tm - when not managed and not fixed-per-slot
-                arguments(
-                        singletonMap(USE_MANAGED_MEMORY, false),
-                        singletonMap(FIX_PER_TM_MEMORY_SIZE, 
MemorySize.ofMebiBytes(1)),
-                        TM_SHARED_UNMANAGED));
+    @Test
+    public void testDefaults() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                emptyMap(),
+                emptyMap(),
+                SLOT,
+                MANAGED_MEMORY_SIZE,
+                true,
+                SLOT_SHARED_MANAGED);
+    }
+
+    @Test
+    public void testDisableManaged() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                singletonMap(USE_MANAGED_MEMORY, false),
+                emptyMap(),
+                null, // everything else below is ignored
+                MemorySize.ZERO,
+                false,
+                null);
+    }
+
+    @Test
+    public void testEnableManaged() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                singletonMap(USE_MANAGED_MEMORY, true),
+                singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), // ignore
+                SLOT,
+                MANAGED_MEMORY_SIZE,
+                true,
+                SLOT_SHARED_MANAGED);
+    }
+
+    @Test
+    public void testEnableManagedByDefault() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                emptyMap(),
+                singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), // ignore
+                SLOT,
+                MANAGED_MEMORY_SIZE,
+                true,
+                SLOT_SHARED_MANAGED);
+    }
+
+    @Test
+    public void testPrioritizeFixedPerSlotOverTm() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                singletonMap(FIX_PER_SLOT_MEMORY_SIZE, PER_SLOT),
+                singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE),
+                SLOT,
+                PER_SLOT,
+                false,
+                SLOT_SHARED_UNMANAGED);
+    }
+
+    @Test
+    public void testPrioritizeFixedPerSlotOverManaged() throws Exception {
+        MemorySize perSlot = PER_SLOT;
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                map(entry(FIX_PER_SLOT_MEMORY_SIZE, perSlot), 
entry(USE_MANAGED_MEMORY, true)),
+                singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE),
+                SLOT,
+                perSlot,
+                false,
+                SLOT_SHARED_UNMANAGED);
+    }
+
+    @Test
+    public void testFixedPerTm() throws Exception {
+        testSelectionStrategy(
+                MANAGED_MEMORY_SIZE,
+                singletonMap(USE_MANAGED_MEMORY, false),
+                singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE),
+                MemoryShareScope.TM,
+                MemorySize.ZERO,
+                false,
+                TM_SHARED_UNMANAGED);
     }
 
-    @ParameterizedTest(name = "jobConfig: {0}, tmConfig: {1}")
-    @MethodSource("getSelectionStrategyParams")
-    @SuppressWarnings("rawtypes")
-    public void testSelectionStrategy(
+    private void testSelectionStrategy(
+            MemorySize managedMemorySize,
             Map<ConfigOption, Object> jobOptions,
             Map<ConfigOption, Object> tmOptions,
-            RocksDBSharedResourcesFactory expected)
-            throws IOException {
+            MemoryShareScope expectedScope,
+            MemorySize expectedSize,
+            Boolean expectManaged,
+            RocksDBSharedResourcesFactory expectedFactory)
+            throws Exception {
 
         Configuration jobConfig = new Configuration();
         jobOptions.forEach(jobConfig::set);
@@ -122,10 +173,26 @@ public class RocksDBSharedResourcesFactoryTest {
         Configuration tmConfig = new Configuration();
         tmOptions.forEach(tmConfig::set);
 
-        assertEquals(
-                expected,
-                RocksDBSharedResourcesFactory.from(
-                        
RocksDBMemoryConfiguration.fromConfiguration(jobConfig), getEnv(tmConfig)));
+        RocksDBMemoryConfiguration jobMemoryConfig =
+                RocksDBMemoryConfiguration.fromConfiguration(jobConfig);
+        RocksDBSharedResourcesFactory actualFactory =
+                RocksDBSharedResourcesFactory.from(jobMemoryConfig, 
getEnv(tmConfig));
+        assertEquals(expectedFactory, actualFactory);
+        if (expectedScope == null) {
+            return;
+        }
+        assertEquals(expectManaged, actualFactory.isManaged());
+        assertEquals(expectedScope, actualFactory.getShareScope());
+        MockEnvironment env =
+                MockEnvironment.builder()
+                        .setManagedMemorySize(managedMemorySize.getBytes())
+                        .build();
+        OpaqueMemoryResource<RocksDBSharedResources> resource1 =
+                actualFactory.create(jobMemoryConfig, env, 1, LOG, 
RocksDBMemoryFactory.DEFAULT);
+        assertEquals(expectedSize.getBytes(), resource1.getSize());
+        OpaqueMemoryResource<RocksDBSharedResources> resource2 =
+                actualFactory.create(jobMemoryConfig, env, 1, LOG, 
RocksDBMemoryFactory.DEFAULT);
+        assertSame(resource1.getResourceHandle(), 
resource2.getResourceHandle());
     }
 
     @Test
@@ -136,18 +203,19 @@ public class RocksDBSharedResourcesFactoryTest {
         tmConfig.set(FIX_PER_TM_MEMORY_SIZE, new MemorySize(size));
         tmConfig.set(WRITE_BUFFER_RATIO, writeBufferRatio);
 
-        OpaqueMemoryResource<RocksDBSharedResources> resource =
+        try (OpaqueMemoryResource<RocksDBSharedResources> resource =
                 TM_SHARED_UNMANAGED.create(
                         RocksDBMemoryConfiguration.fromConfiguration(new 
Configuration()),
                         getEnv(tmConfig),
                         0, // managed memory fraction must be ignored
                         LOG,
-                        
RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT);
+                        RocksDBMemoryFactory.DEFAULT)) {
 
-        assertEquals(size, resource.getSize());
-        assertEquals(
-                calculateWriteBufferManagerCapacity(size, writeBufferRatio),
-                resource.getResourceHandle().getWriteBufferManagerCapacity());
+            assertEquals(size, resource.getSize());
+            assertEquals(
+                    calculateWriteBufferManagerCapacity(size, 
writeBufferRatio),
+                    
resource.getResourceHandle().getWriteBufferManagerCapacity());
+        }
     }
 
     private static Environment getEnv(Configuration tmConfig) throws 
IOException {

Reply via email to