This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff9f928  KAFKA-6911; Fix dynamic keystore/truststore update check 
(#5029)
ff9f928 is described below

commit ff9f928c16ddf95311f1c1badc64212b4975e623
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Fri May 25 00:24:37 2018 +0100

    KAFKA-6911; Fix dynamic keystore/truststore update check (#5029)
    
    Fix the check, add unit test to verify the change, update 
`DynamicBrokerReconfigurationTest` to avoid dynamic keystore update in tests 
which are not expected to update keystores.
---
 .../kafka/common/security/ssl/SslFactory.java      | 14 +++----
 .../common/network/SslTransportLayerTest.java      | 47 +++++++++++++---------
 .../server/DynamicBrokerReconfigurationTest.scala  | 10 +++--
 3 files changed, 40 insertions(+), 31 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 6989349..055404c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -176,10 +176,10 @@ public class SslFactory implements Reconfigurable {
     }
 
     private SecurityStore maybeCreateNewKeystore(Map<String, ?> configs) {
-        boolean keystoreChanged = 
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) 
||
-                
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), 
keystore.path) ||
-                
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), 
keystore.password) ||
-                
Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), 
keystore.keyPassword);
+        boolean keystoreChanged = 
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), 
keystore.type) ||
+                
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), 
keystore.path) ||
+                
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), 
keystore.password) ||
+                
!Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), 
keystore.keyPassword);
 
         if (keystoreChanged) {
             return createKeystore((String) 
configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
@@ -191,9 +191,9 @@ public class SslFactory implements Reconfigurable {
     }
 
     private SecurityStore maybeCreateNewTruststore(Map<String, ?> configs) {
-        boolean truststoreChanged = 
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), 
truststore.type) ||
-                
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), 
truststore.path) ||
-                
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), 
truststore.password);
+        boolean truststoreChanged = 
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), 
truststore.type) ||
+                
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), 
truststore.path) ||
+                
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), 
truststore.password);
 
         if (truststoreChanged) {
             return createTruststore((String) 
configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index f375dd6..2df4c4f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -847,18 +847,14 @@ public class SslTransportLayerTest {
 
         CertStores invalidCertStores = new CertStores(true, "server", 
"127.0.0.1");
         Map<String, Object>  invalidConfigs = 
invalidCertStores.getTrustingConfig(clientCertStores);
-        try {
-            reconfigurableBuilder.validateReconfiguration(invalidConfigs);
-            fail("Should have failed validation with an exception with 
different SubjectAltName");
-        } catch (KafkaException e) {
-            // expected exception
-        }
-        try {
-            reconfigurableBuilder.reconfigure(invalidConfigs);
-            fail("Should have failed to reconfigure with different 
SubjectAltName");
-        } catch (KafkaException e) {
-            // expected exception
-        }
+        verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, 
"keystore with different SubjectAltName");
+
+        Map<String, Object>  missingStoreConfigs = new HashMap<>();
+        missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
+        missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
"some.keystore.path");
+        missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new 
Password("some.keystore.password"));
+        missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new 
Password("some.key.password"));
+        verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, 
"keystore not found");
 
         // Verify that new connections continue to work with the server with 
previously configured keystore after failed reconfiguration
         newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -911,22 +907,33 @@ public class SslTransportLayerTest {
 
         Map<String, Object>  invalidConfigs = new 
HashMap<>(newTruststoreConfigs);
         invalidConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
"INVALID_TYPE");
+        verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, 
"invalid truststore type");
+
+        Map<String, Object>  missingStoreConfigs = new HashMap<>();
+        missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12");
+        missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
"some.truststore.path");
+        missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new 
Password("some.truststore.password"));
+        verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, 
"truststore not found");
+
+        // Verify that new connections continue to work with the server with 
previously configured keystore after failed reconfiguration
+        newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 
10);
+    }
+
+    private void verifyInvalidReconfigure(ListenerReconfigurable 
reconfigurable,
+                                          Map<String, Object>  invalidConfigs, 
String errorMessage) {
         try {
-            reconfigurableBuilder.validateReconfiguration(invalidConfigs);
-            fail("Should have failed validation with an exception with invalid 
truststore type");
+            reconfigurable.validateReconfiguration(invalidConfigs);
+            fail("Should have failed validation with an exception: " + 
errorMessage);
         } catch (KafkaException e) {
             // expected exception
         }
         try {
-            reconfigurableBuilder.reconfigure(invalidConfigs);
-            fail("Should have failed to reconfigure with with invalid 
truststore type");
+            reconfigurable.reconfigure(invalidConfigs);
+            fail("Should have failed to reconfigure: " + errorMessage);
         } catch (KafkaException e) {
             // expected exception
         }
-
-        // Verify that new connections continue to work with the server with 
previously configured keystore after failed reconfiguration
-        newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
-        NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 
10);
     }
 
     private Selector createSelector(Map<String, Object> sslClientConfigs) {
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fb96f9d..a4854ae 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -119,12 +119,13 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       props ++= sslProperties1
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, 
listenerPrefix(SecureInternal))
 
-      // Set invalid static properties to ensure that dynamic config is used
+      // Set invalid top-level properties to ensure that listener config is 
used
+      // Don't set any dynamic configs here since they get overridden in tests
       props ++= invalidSslProperties
-      props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, 
listenerPrefix(SecureExternal))
+      props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, "")
+      props ++= securityProps(sslProperties1, KEYSTORE_PROPS, 
listenerPrefix(SecureExternal))
 
       val kafkaConfig = KafkaConfig.fromProps(props)
-      configureDynamicKeystoreInZooKeeper(kafkaConfig, Seq(brokerId), 
sslProperties1)
 
       servers += TestUtils.createServer(kafkaConfig)
     }
@@ -183,7 +184,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       if (overrideCount > 0) {
         val listenerPrefix = "listener.name.external.ssl."
         verifySynonym(configName, synonyms.get(0), isSensitive, 
listenerPrefix, ConfigSource.DYNAMIC_BROKER_CONFIG, sslProperties1)
-        verifySynonym(configName, synonyms.get(1), isSensitive, 
listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
+        verifySynonym(configName, synonyms.get(1), isSensitive, 
listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, sslProperties1)
       }
       verifySynonym(configName, synonyms.get(overrideCount), isSensitive, 
"ssl.", ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
       defaultValue.foreach { value =>
@@ -204,6 +205,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     }
 
     val adminClient = adminClients.head
+    alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
 
     val configDesc = describeConfig(adminClient)
     verifySslConfig("listener.name.external.", sslProperties1, configDesc)

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to