This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3adc2694fed657e9b0ef2b6682f1a76555b8d31 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Oct 22 21:31:53 2025 +0200 [FLINK-35556] Harden RocksDBSharedResourcesFactoryTest --- .../rocksdb/RocksDBSharedResourcesFactory.java | 11 ++ .../rocksdb/RocksDBSharedResourcesFactoryTest.java | 190 ++++++++++++++------- 2 files changed, 140 insertions(+), 61 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactory.java index 4130e3a187e..8ff4ed6c020 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.state.rocksdb; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.OpaqueMemoryResource; @@ -97,6 +98,16 @@ enum RocksDBSharedResourcesFactory { this.shareScope = shareScope; } + @VisibleForTesting + public boolean isManaged() { + return managed; + } + + @VisibleForTesting + public MemoryShareScope getShareScope() { + return shareScope; + } + @Nullable public static RocksDBSharedResourcesFactory from( RocksDBMemoryConfiguration jobMemoryConfig, Environment env) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactoryTest.java index 2b0f1b83d48..492719eb906 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactoryTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBSharedResourcesFactoryTest.java @@ -24,13 +24,11 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; +import org.apache.flink.state.rocksdb.RocksDBMemoryControllerUtils.RocksDBMemoryFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,30 +36,35 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.Map; -import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.FIX_PER_TM_MEMORY_SIZE; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.USE_MANAGED_MEMORY; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO; import static org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution; import static org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded; +import static org.apache.flink.state.rocksdb.MemoryShareScope.SLOT; import static org.apache.flink.state.rocksdb.RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity; -import static org.apache.flink.state.rocksdb.RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE; -import static org.apache.flink.state.rocksdb.RocksDBOptions.FIX_PER_TM_MEMORY_SIZE; -import static org.apache.flink.state.rocksdb.RocksDBOptions.USE_MANAGED_MEMORY; -import static org.apache.flink.state.rocksdb.RocksDBOptions.WRITE_BUFFER_RATIO; import static org.apache.flink.state.rocksdb.RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED; import static org.apache.flink.state.rocksdb.RocksDBSharedResourcesFactory.SLOT_SHARED_UNMANAGED; import static org.apache.flink.state.rocksdb.RocksDBSharedResourcesFactory.TM_SHARED_UNMANAGED; import static org.apache.flink.util.CollectionUtil.entry; import static org.apache.flink.util.CollectionUtil.map; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.junit.jupiter.api.Assertions.assertSame; /** {@link RocksDBSharedResourcesFactory} test. */ +@SuppressWarnings({"rawtypes", "DataFlowIssue", "SameParameterValue"}) public class RocksDBSharedResourcesFactoryTest { private static final Logger LOG = LoggerFactory.getLogger(RocksDBSharedResourcesFactoryTest.class); + private static final MemorySize TM_SIZE = new MemorySize(20); + private static final MemorySize MANAGED_MEMORY_SIZE = new MemorySize(15); + public static final MemorySize PER_SLOT = new MemorySize(10); + @TempDir static Path tempDir; @BeforeAll @@ -69,52 +72,100 @@ public class RocksDBSharedResourcesFactoryTest { ensureRocksDBIsLoaded(tempDir.toAbsolutePath().toString()); } - private static Stream<Arguments> getSelectionStrategyParams() { - Map<Object, Object> defaults = emptyMap(); - - // format: job options, tm options, expected factory type - return Stream.of( - // default: per slot, managed - arguments(defaults, defaults, SLOT_SHARED_MANAGED), - // no sharing (allocate per column family), unmanaged - arguments(singletonMap(USE_MANAGED_MEMORY, false), defaults, null), - // prioritize managed (set explicitly) - arguments( - singletonMap(USE_MANAGED_MEMORY, true), - singletonMap(FIX_PER_TM_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - SLOT_SHARED_MANAGED), - // prioritize managed (job default) - arguments( - defaults, - singletonMap(FIX_PER_TM_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - SLOT_SHARED_MANAGED), - // prioritize fixed-per-slot over fixed-per-tm - arguments( - singletonMap(FIX_PER_SLOT_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - singletonMap(FIX_PER_TM_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - SLOT_SHARED_UNMANAGED), - // prioritize fixed-per-slot over managed - arguments( - map( - entry(FIX_PER_SLOT_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - entry(USE_MANAGED_MEMORY, true)), - singletonMap(FIX_PER_TM_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - SLOT_SHARED_UNMANAGED), - // use fixed-per-tm - when not managed and not fixed-per-slot - arguments( - singletonMap(USE_MANAGED_MEMORY, false), - singletonMap(FIX_PER_TM_MEMORY_SIZE, MemorySize.ofMebiBytes(1)), - TM_SHARED_UNMANAGED)); + @Test + public void testDefaults() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + emptyMap(), + emptyMap(), + SLOT, + MANAGED_MEMORY_SIZE, + true, + SLOT_SHARED_MANAGED); + } + + @Test + public void testDisableManaged() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + singletonMap(USE_MANAGED_MEMORY, false), + emptyMap(), + null, // everything else below is ignored + MemorySize.ZERO, + false, + null); + } + + @Test + public void testEnableManaged() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + singletonMap(USE_MANAGED_MEMORY, true), + singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), // ignore + SLOT, + MANAGED_MEMORY_SIZE, + true, + SLOT_SHARED_MANAGED); + } + + @Test + public void testEnableManagedByDefault() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + emptyMap(), + singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), // ignore + SLOT, + MANAGED_MEMORY_SIZE, + true, + SLOT_SHARED_MANAGED); + } + + @Test + public void testPrioritizeFixedPerSlotOverTm() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + singletonMap(FIX_PER_SLOT_MEMORY_SIZE, PER_SLOT), + singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), + SLOT, + PER_SLOT, + false, + SLOT_SHARED_UNMANAGED); + } + + @Test + public void testPrioritizeFixedPerSlotOverManaged() throws Exception { + MemorySize perSlot = PER_SLOT; + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + map(entry(FIX_PER_SLOT_MEMORY_SIZE, perSlot), entry(USE_MANAGED_MEMORY, true)), + singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), + SLOT, + perSlot, + false, + SLOT_SHARED_UNMANAGED); + } + + @Test + public void testFixedPerTm() throws Exception { + testSelectionStrategy( + MANAGED_MEMORY_SIZE, + singletonMap(USE_MANAGED_MEMORY, false), + singletonMap(FIX_PER_TM_MEMORY_SIZE, TM_SIZE), + MemoryShareScope.TM, + MemorySize.ZERO, + false, + TM_SHARED_UNMANAGED); } - @ParameterizedTest(name = "jobConfig: {0}, tmConfig: {1}") - @MethodSource("getSelectionStrategyParams") - @SuppressWarnings("rawtypes") - public void testSelectionStrategy( + private void testSelectionStrategy( + MemorySize managedMemorySize, Map<ConfigOption, Object> jobOptions, Map<ConfigOption, Object> tmOptions, - RocksDBSharedResourcesFactory expected) - throws IOException { + MemoryShareScope expectedScope, + MemorySize expectedSize, + Boolean expectManaged, + RocksDBSharedResourcesFactory expectedFactory) + throws Exception { Configuration jobConfig = new Configuration(); jobOptions.forEach(jobConfig::set); @@ -122,10 +173,26 @@ public class RocksDBSharedResourcesFactoryTest { Configuration tmConfig = new Configuration(); tmOptions.forEach(tmConfig::set); - assertEquals( - expected, - RocksDBSharedResourcesFactory.from( - RocksDBMemoryConfiguration.fromConfiguration(jobConfig), getEnv(tmConfig))); + RocksDBMemoryConfiguration jobMemoryConfig = + RocksDBMemoryConfiguration.fromConfiguration(jobConfig); + RocksDBSharedResourcesFactory actualFactory = + RocksDBSharedResourcesFactory.from(jobMemoryConfig, getEnv(tmConfig)); + assertEquals(expectedFactory, actualFactory); + if (expectedScope == null) { + return; + } + assertEquals(expectManaged, actualFactory.isManaged()); + assertEquals(expectedScope, actualFactory.getShareScope()); + MockEnvironment env = + MockEnvironment.builder() + .setManagedMemorySize(managedMemorySize.getBytes()) + .build(); + OpaqueMemoryResource<RocksDBSharedResources> resource1 = + actualFactory.create(jobMemoryConfig, env, 1, LOG, RocksDBMemoryFactory.DEFAULT); + assertEquals(expectedSize.getBytes(), resource1.getSize()); + OpaqueMemoryResource<RocksDBSharedResources> resource2 = + actualFactory.create(jobMemoryConfig, env, 1, LOG, RocksDBMemoryFactory.DEFAULT); + assertSame(resource1.getResourceHandle(), resource2.getResourceHandle()); } @Test @@ -136,18 +203,19 @@ public class RocksDBSharedResourcesFactoryTest { tmConfig.set(FIX_PER_TM_MEMORY_SIZE, new MemorySize(size)); tmConfig.set(WRITE_BUFFER_RATIO, writeBufferRatio); - OpaqueMemoryResource<RocksDBSharedResources> resource = + try (OpaqueMemoryResource<RocksDBSharedResources> resource = TM_SHARED_UNMANAGED.create( RocksDBMemoryConfiguration.fromConfiguration(new Configuration()), getEnv(tmConfig), 0, // managed memory fraction must be ignored LOG, - RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT); + RocksDBMemoryFactory.DEFAULT)) { - assertEquals(size, resource.getSize()); - assertEquals( - calculateWriteBufferManagerCapacity(size, writeBufferRatio), - resource.getResourceHandle().getWriteBufferManagerCapacity()); + assertEquals(size, resource.getSize()); + assertEquals( + calculateWriteBufferManagerCapacity(size, writeBufferRatio), + resource.getResourceHandle().getWriteBufferManagerCapacity()); + } } private static Environment getEnv(Configuration tmConfig) throws IOException {
