Repository: flink
Updated Branches:
  refs/heads/master 5ccd90715 -> ad3454069


[FLINK-4309] Fix potential NPE in DelegatingConfiguration

This closes #2371.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad345406
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad345406
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad345406

Branch: refs/heads/master
Commit: ad3454069fa091cd453f6cefb0e56a8021a3c269
Parents: 5ccd907
Author: Sunny T <sunny@Sunnys-MacBook-Pro-3.local>
Authored: Sun Aug 14 15:50:08 2016 -0700
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Aug 16 12:39:42 2016 +0200

----------------------------------------------------------------------
 .../configuration/DelegatingConfiguration.java  | 12 ++++--
 .../DelegatingConfigurationTest.java            | 45 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad345406/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 91f81d4..dba77f3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -56,7 +57,7 @@ public final class DelegatingConfiguration extends 
Configuration {
         */
        public DelegatingConfiguration(Configuration backingConfig, String 
prefix)
        {
-               this.backingConfig = backingConfig;
+               this.backingConfig = Preconditions.checkNotNull(backingConfig);
                this.prefix = prefix;
        }
 
@@ -178,14 +179,19 @@ public final class DelegatingConfiguration extends 
Configuration {
 
        @Override
        public Set<String> keySet() {
+               if (this.prefix == null) {
+                       return this.backingConfig.keySet();
+               }
+
                final HashSet<String> set = new HashSet<String>();
-               final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
+               int prefixLen = this.prefix.length();
 
                for (String key : this.backingConfig.keySet()) {
-                       if (key.startsWith(this.prefix)) {
+                       if (key.startsWith(prefix)) {
                                set.add(key.substring(prefixLen));
                        }
                }
+
                return set;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad345406/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index 6d42129..d8b782d 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -26,8 +26,10 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Set;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 
 public class DelegatingConfigurationTest {
@@ -88,4 +90,47 @@ public class DelegatingConfigurationTest {
                        assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
                }
        }
+       
+       @Test
+       public void testDelegationConfigurationWithNullPrefix() {
+               Configuration backingConf = new Configuration();
+               backingConf.setValueInternal("test-key", "value");
+
+               DelegatingConfiguration configuration = new 
DelegatingConfiguration(
+                               backingConf, null);
+               Set<String> keySet = configuration.keySet();
+
+               assertEquals(keySet, backingConf.keySet());
+
+       }
+
+       @Test
+       public void testDelegationConfigurationWithPrefix() {
+               String prefix = "pref-";
+               String expectedKey = "key";
+
+               /*
+                * Key matches the prefix
+                */
+               Configuration backingConf = new Configuration();
+               backingConf.setValueInternal(prefix + expectedKey, "value");
+
+               DelegatingConfiguration configuration = new 
DelegatingConfiguration(backingConf, prefix);
+               Set<String> keySet = configuration.keySet();
+               
+
+               assertEquals(keySet.size(), 1);
+               assertEquals(keySet.iterator().next(), expectedKey);
+
+               /*
+                * Key does not match the prefix
+                */
+               backingConf = new Configuration();
+               backingConf.setValueInternal("test-key", "value");
+
+               configuration = new DelegatingConfiguration(backingConf, 
prefix);
+               keySet = configuration.keySet();
+
+               assertTrue(keySet.isEmpty());
+       }
 }

Reply via email to