This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff9f928 KAFKA-6911; Fix dynamic keystore/truststore update check (#5029) ff9f928 is described below commit ff9f928c16ddf95311f1c1badc64212b4975e623 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri May 25 00:24:37 2018 +0100 KAFKA-6911; Fix dynamic keystore/truststore update check (#5029) Fix the check, add unit test to verify the change, update `DynamicBrokerReconfigurationTest` to avoid dynamic keystore update in tests which are not expected to update keystores. --- .../kafka/common/security/ssl/SslFactory.java | 14 +++---- .../common/network/SslTransportLayerTest.java | 47 +++++++++++++--------- .../server/DynamicBrokerReconfigurationTest.scala | 10 +++-- 3 files changed, 40 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 6989349..055404c 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -176,10 +176,10 @@ public class SslFactory implements Reconfigurable { } private SecurityStore maybeCreateNewKeystore(Map<String, ?> configs) { - boolean keystoreChanged = Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) || - Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) || - Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) || - Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword); + boolean keystoreChanged = !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) || + !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) || + !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) || + !Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword); if (keystoreChanged) { return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), @@ -191,9 +191,9 @@ public class SslFactory implements Reconfigurable { } private SecurityStore maybeCreateNewTruststore(Map<String, ?> configs) { - boolean truststoreChanged = Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), truststore.type) || - Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) || - Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password); + boolean truststoreChanged = !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), truststore.type) || + !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) || + !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password); if (truststoreChanged) { return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index f375dd6..2df4c4f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -847,18 +847,14 @@ public class SslTransportLayerTest { CertStores invalidCertStores = new CertStores(true, "server", "127.0.0.1"); Map<String, Object> invalidConfigs = invalidCertStores.getTrustingConfig(clientCertStores); - try { - reconfigurableBuilder.validateReconfiguration(invalidConfigs); - fail("Should have failed validation with an exception with different SubjectAltName"); - } catch (KafkaException e) { - // expected exception - } - try { - reconfigurableBuilder.reconfigure(invalidConfigs); - fail("Should have failed to reconfigure with different SubjectAltName"); - } catch (KafkaException e) { - // expected exception - } + verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "keystore with different SubjectAltName"); + + Map<String, Object> missingStoreConfigs = new HashMap<>(); + missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12"); + missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "some.keystore.path"); + missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new Password("some.keystore.password")); + missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("some.key.password")); + verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found"); // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE); @@ -911,22 +907,33 @@ public class SslTransportLayerTest { Map<String, Object> invalidConfigs = new HashMap<>(newTruststoreConfigs); invalidConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "INVALID_TYPE"); + verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "invalid truststore type"); + + Map<String, Object> missingStoreConfigs = new HashMap<>(); + missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"); + missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "some.truststore.path"); + missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("some.truststore.password")); + verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found"); + + // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration + newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10); + } + + private void verifyInvalidReconfigure(ListenerReconfigurable reconfigurable, + Map<String, Object> invalidConfigs, String errorMessage) { try { - reconfigurableBuilder.validateReconfiguration(invalidConfigs); - fail("Should have failed validation with an exception with invalid truststore type"); + reconfigurable.validateReconfiguration(invalidConfigs); + fail("Should have failed validation with an exception: " + errorMessage); } catch (KafkaException e) { // expected exception } try { - reconfigurableBuilder.reconfigure(invalidConfigs); - fail("Should have failed to reconfigure with with invalid truststore type"); + reconfigurable.reconfigure(invalidConfigs); + fail("Should have failed to reconfigure: " + errorMessage); } catch (KafkaException e) { // expected exception } - - // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration - newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10); } private Selector createSelector(Map<String, Object> sslClientConfigs) { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index fb96f9d..a4854ae 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -119,12 +119,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props ++= sslProperties1 props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal)) - // Set invalid static properties to ensure that dynamic config is used + // Set invalid top-level properties to ensure that listener config is used + // Don't set any dynamic configs here since they get overridden in tests props ++= invalidSslProperties - props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) + props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, "") + props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) val kafkaConfig = KafkaConfig.fromProps(props) - configureDynamicKeystoreInZooKeeper(kafkaConfig, Seq(brokerId), sslProperties1) servers += TestUtils.createServer(kafkaConfig) } @@ -183,7 +184,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet if (overrideCount > 0) { val listenerPrefix = "listener.name.external.ssl." verifySynonym(configName, synonyms.get(0), isSensitive, listenerPrefix, ConfigSource.DYNAMIC_BROKER_CONFIG, sslProperties1) - verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties) + verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, sslProperties1) } verifySynonym(configName, synonyms.get(overrideCount), isSensitive, "ssl.", ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties) defaultValue.foreach { value => @@ -204,6 +205,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } val adminClient = adminClients.head + alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal) val configDesc = describeConfig(adminClient) verifySslConfig("listener.name.external.", sslProperties1, configDesc) -- To stop receiving notification emails like this one, please contact j...@apache.org.