This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5be8b3e1dfdb5a009677b2e194def2ad3b1122ec
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Tue Feb 27 10:48:40 2024 +0100

    [hotfix] Fix configuration through TernaryBoolean in 
EmbeddedRocksDBStateBackend.
---
 .../java/org/apache/flink/util/TernaryBoolean.java | 18 +++++++
 .../state/EmbeddedRocksDBStateBackend.java         | 21 ++++----
 .../state/EmbeddedRocksDBStateBackendTest.java     | 57 ++++++++++++++++++++--
 .../state/RocksDBStateBackendConfigTest.java       | 25 ++++++++++
 4 files changed, 107 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java 
b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
index 4cfd71ac238..4f06c15e9f8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
@@ -19,6 +19,8 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 
 import javax.annotation.Nullable;
 
@@ -71,6 +73,22 @@ public enum TernaryBoolean {
         return this == UNDEFINED ? null : (this == TRUE ? Boolean.TRUE : 
Boolean.FALSE);
     }
 
+    /**
+     * Merges an existing value with a config, accepting the config's value 
only if the existing
+     * value is undefined.
+     *
+     * @param original the value to merge with the config.
+     * @param configOption the config option to merge with from the config.
+     * @param config the config to merge with.
+     */
+    public static TernaryBoolean mergeTernaryBooleanWithConfig(
+            TernaryBoolean original, ConfigOption<Boolean> configOption, 
ReadableConfig config) {
+        if (original != TernaryBoolean.UNDEFINED) {
+            return original;
+        }
+        return 
TernaryBoolean.fromBoxedBoolean(config.getOptional(configOption).orElse(null));
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 1a489fad8c8..baca5f4a013 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -321,21 +321,20 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 "Overlap fraction threshold of restoring should be between 0 
and 1");
 
         incrementalRestoreAsyncCompactAfterRescale =
-                original.incrementalRestoreAsyncCompactAfterRescale == 
TernaryBoolean.UNDEFINED
-                        ? TernaryBoolean.fromBoxedBoolean(
-                                
config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE))
-                        : original.incrementalRestoreAsyncCompactAfterRescale;
+                TernaryBoolean.mergeTernaryBooleanWithConfig(
+                        original.incrementalRestoreAsyncCompactAfterRescale,
+                        INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
+                        config);
 
         useIngestDbRestoreMode =
-                original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED
-                        ? 
TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
-                        : 
TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());
+                TernaryBoolean.mergeTernaryBooleanWithConfig(
+                        original.useIngestDbRestoreMode, 
USE_INGEST_DB_RESTORE_MODE, config);
 
         rescalingUseDeleteFilesInRange =
-                original.rescalingUseDeleteFilesInRange == 
TernaryBoolean.UNDEFINED
-                        ? TernaryBoolean.fromBoxedBoolean(
-                                
config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING))
-                        : original.rescalingUseDeleteFilesInRange;
+                TernaryBoolean.mergeTernaryBooleanWithConfig(
+                        original.rescalingUseDeleteFilesInRange,
+                        USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
+                        config);
 
         this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 5feeeaf5608..41773e4be28 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -92,9 +93,12 @@ import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.reset;
@@ -186,14 +190,19 @@ public class EmbeddedRocksDBStateBackendTest
         dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath();
         EmbeddedRocksDBStateBackend backend =
                 new 
EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
+        Configuration configuration = createBackendConfig();
+        backend = backend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+        backend.setDbStoragePath(dbPath);
+        return backend;
+    }
+
+    private Configuration createBackendConfig() {
         Configuration configuration = new Configuration();
         configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
         configuration.set(
                 RocksDBOptions.TIMER_SERVICE_FACTORY,
                 EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
-        backend = backend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
-        backend.setDbStoragePath(dbPath);
-        return backend;
+        return configuration;
     }
 
     @Override
@@ -656,6 +665,48 @@ public class EmbeddedRocksDBStateBackendTest
         
assertThatThrownBy(state::clear).isInstanceOf(FlinkRuntimeException.class);
     }
 
+    /** Test for all configs that use {@link 
org.apache.flink.util.TernaryBoolean}. */
+    @TestTemplate
+    public void testConfigureTernaryBooleanConfigs() throws Exception {
+        ConfigurableStateBackend stateBackend = getStateBackend();
+        if (!(stateBackend instanceof EmbeddedRocksDBStateBackend)) {
+            return;
+        }
+        EmbeddedRocksDBStateBackend rocksDBStateBackend =
+                (EmbeddedRocksDBStateBackend) stateBackend;
+        Configuration baseConfig = createBackendConfig();
+        Configuration testConfig = new Configuration();
+        testConfig.setBoolean(
+                USE_INGEST_DB_RESTORE_MODE, 
!USE_INGEST_DB_RESTORE_MODE.defaultValue());
+        testConfig.setBoolean(
+                INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
+                
!INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue());
+        testConfig.setBoolean(
+                USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
+                !USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue());
+        EmbeddedRocksDBStateBackend configuredBackend =
+                rocksDBStateBackend.configure(
+                        testConfig, 
Thread.currentThread().getContextClassLoader());
+
+        checkBooleanWithBaseConf(
+                baseConfig,
+                USE_INGEST_DB_RESTORE_MODE,
+                configuredBackend.getUseIngestDbRestoreMode());
+        checkBooleanWithBaseConf(
+                baseConfig,
+                USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
+                configuredBackend.isRescalingUseDeleteFilesInRange());
+        checkBooleanWithBaseConf(
+                baseConfig,
+                INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
+                
configuredBackend.getIncrementalRestoreAsyncCompactAfterRescale());
+    }
+
+    private void checkBooleanWithBaseConf(
+            Configuration testConfig, ConfigOption<Boolean> option, boolean 
value) {
+        
assertEquals(testConfig.getOptional(option).orElse(!option.defaultValue()), 
value);
+    }
+
     private void runStateUpdates() throws Exception {
         for (int i = 50; i < 150; ++i) {
             if (i % 10 == 0) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d54210e727e..79869038bd7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -890,6 +890,31 @@ public class RocksDBStateBackendConfigTest {
         assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode());
     }
 
+    @Test
+    public void testDefaultUseDeleteFilesInRange() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        assertEquals(
+                
RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
+                        .defaultValue(),
+                rocksDBStateBackend.isRescalingUseDeleteFilesInRange());
+    }
+
+    @Test
+    public void testConfigureUseFilesInRange() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        Configuration configuration = new Configuration();
+        configuration.set(
+                
RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
+                
!RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
+                        .defaultValue());
+        rocksDBStateBackend =
+                rocksDBStateBackend.configure(configuration, 
getClass().getClassLoader());
+        assertEquals(
+                
!RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
+                        .defaultValue(),
+                rocksDBStateBackend.isRescalingUseDeleteFilesInRange());
+    }
+
     @Test
     public void testDefaultIncrementalRestoreInstanceBufferSize() {
         EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);

Reply via email to