This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 78dfe14b5c9 [FLINK-33947][configuration] Fix bugs about prefix 
attribute in the DelegatingConfiguration class.
78dfe14b5c9 is described below

commit 78dfe14b5c9ff495fd38beb7c2d7f88b341f8c20
Author: Roc Marshal <flin...@126.com>
AuthorDate: Wed Dec 27 23:35:29 2023 +0800

    [FLINK-33947][configuration] Fix bugs about prefix attribute in the 
DelegatingConfiguration class.
---
 .../configuration/DelegatingConfiguration.java     | 17 +++++------
 .../configuration/DelegatingConfigurationTest.java | 33 +++++++++++++++++++---
 2 files changed, 38 insertions(+), 12 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index b9eaad709a1..84c91483fc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,14 +46,13 @@ public final class DelegatingConfiguration extends 
Configuration {
 
     private final Configuration backingConfig; // the configuration actually 
storing the data
 
-    private String prefix; // the prefix key by which keys for this config are 
marked
+    @Nonnull private String prefix; // the prefix key by which keys for this 
config are marked
 
     // 
--------------------------------------------------------------------------------------------
 
     /** Default constructor for serialization. Creates an empty delegating 
configuration. */
     public DelegatingConfiguration() {
-        this.backingConfig = new Configuration();
-        this.prefix = "";
+        this(new Configuration(), "");
     }
 
     /**
@@ -63,7 +64,7 @@ public final class DelegatingConfiguration extends 
Configuration {
      */
     public DelegatingConfiguration(Configuration backingConfig, String prefix) 
{
         this.backingConfig = Preconditions.checkNotNull(backingConfig);
-        this.prefix = prefix;
+        this.prefix = Preconditions.checkNotNull(prefix, "The 'prefix' 
attribute mustn't be null.");
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -285,7 +286,7 @@ public final class DelegatingConfiguration extends 
Configuration {
 
     @Override
     public Set<String> keySet() {
-        if (this.prefix == null) {
+        if (this.prefix.isEmpty()) {
             return this.backingConfig.keySet();
         }
 
@@ -321,12 +322,12 @@ public final class DelegatingConfiguration extends 
Configuration {
 
     @Override
     public <T> boolean removeConfig(ConfigOption<T> configOption) {
-        return backingConfig.removeConfig(configOption);
+        return backingConfig.removeConfig(prefixOption(configOption, prefix));
     }
 
     @Override
     public boolean removeKey(String key) {
-        return backingConfig.removeKey(key);
+        return backingConfig.removeKey(prefix + key);
     }
 
     @Override
@@ -359,7 +360,7 @@ public final class DelegatingConfiguration extends 
Configuration {
 
     @Override
     public void read(DataInputView in) throws IOException {
-        this.prefix = in.readUTF();
+        this.prefix = Preconditions.checkNotNull(in.readUTF());
         this.backingConfig.read(in);
     }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index 59cb4718f9c..d7ad7ec90ed 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link DelegatingConfiguration}. */
 class DelegatingConfigurationTest {
@@ -78,14 +79,15 @@ class DelegatingConfigurationTest {
     }
 
     @Test
-    void testDelegationConfigurationWithNullPrefix() {
+    void testDelegationConfigurationWithNullOrEmptyPrefix() {
         Configuration backingConf = new Configuration();
         backingConf.setValueInternal("test-key", "value", false);
 
-        DelegatingConfiguration configuration = new 
DelegatingConfiguration(backingConf, null);
-        Set<String> keySet = configuration.keySet();
+        assertThatThrownBy(() -> new DelegatingConfiguration(backingConf, 
null))
+                .isInstanceOf(NullPointerException.class);
 
-        assertThat(backingConf.keySet()).isEqualTo(keySet);
+        DelegatingConfiguration configuration = new 
DelegatingConfiguration(backingConf, "");
+        assertThat(backingConf.keySet()).isEqualTo(configuration.keySet());
     }
 
     @Test
@@ -191,4 +193,27 @@ class DelegatingConfigurationTest {
         delegatingConf.setBoolean(booleanOption, false);
         assertThat(delegatingConf.getBoolean(booleanOption, 
true)).isEqualTo(false);
     }
+
+    @Test
+    void testRemoveKeyOrConfig() {
+        Configuration original = new Configuration();
+        final DelegatingConfiguration delegatingConf =
+                new DelegatingConfiguration(original, "prefix.");
+        ConfigOption<Integer> integerOption =
+                ConfigOptions.key("integer.key").intType().noDefaultValue();
+
+        // Test for removeConfig
+        delegatingConf.set(integerOption, 0);
+        assertThat(delegatingConf.get(integerOption)).isZero();
+        delegatingConf.removeConfig(integerOption);
+        assertThat(delegatingConf.getOptional(integerOption)).isEmpty();
+        assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero();
+
+        // Test for removeKey
+        delegatingConf.set(integerOption, 0);
+        assertThat(delegatingConf.getInteger(integerOption, -1)).isZero();
+        delegatingConf.removeKey(integerOption.key());
+        assertThat(delegatingConf.getOptional(integerOption)).isEmpty();
+        assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero();
+    }
 }

Reply via email to