This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d43abf1e09 KAFKA-14770: Allow dynamic keystore update for brokers if 
string representation of DN matches even if canonical DNs don't match (#13346)
4d43abf1e09 is described below

commit 4d43abf1e09e01fc5e7af52f65e3fbae02cf9771
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Mar 7 09:41:01 2023 +0000

    KAFKA-14770: Allow dynamic keystore update for brokers if string 
representation of DN matches even if canonical DNs don't match (#13346)
    
    To avoid mistakes during dynamic broker config updates that could 
potentially affect clients, we restrict changes that can be performed 
dynamically without broker restart. For broker keystore updates, we require the 
DN to be the same for the old and new certificates since this could potentially 
contain host names used for host name verification by clients. DNs are compared 
using standard Java implementation of X500Principal.equals() which compares 
canonical names. If tags of fields ch [...]
    
    Reviewers: Manikumar Reddy <[email protected]>, Kalpesh Patel 
<[email protected]>
---
 .../kafka/common/security/ssl/SslFactory.java      |  8 ++++-
 .../kafka/common/security/ssl/SslFactoryTest.java  | 38 ++++++++++++++++++++++
 .../java/org/apache/kafka/test/TestSslUtils.java   | 19 +++++++++--
 3 files changed, 62 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index d0cc4cc1e69..65c37aa6b47 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -312,7 +312,13 @@ public class SslFactory implements Reconfigurable, 
Closeable {
             for (int i = 0; i < newEntries.size(); i++) {
                 CertificateEntries newEntry = newEntries.get(i);
                 CertificateEntries oldEntry = oldEntries.get(i);
-                if (!Objects.equals(newEntry.subjectPrincipal, 
oldEntry.subjectPrincipal)) {
+                Principal newPrincipal = newEntry.subjectPrincipal;
+                Principal oldPrincipal = oldEntry.subjectPrincipal;
+                // Compare principal objects to compare canonical names (e.g. 
to ignore leading/trailing whitespaces).
+                // Canonical names may differ if the tags of a field changes 
from one with a printable string representation
+                // to one without or vice-versa due to optional conversion to 
hex representation based on the tag. So we
+                // also compare Principal.getName which compares the RFC2253 
name. If either matches, allow dynamic update.
+                if (!Objects.equals(newPrincipal, oldPrincipal) && 
!newPrincipal.getName().equalsIgnoreCase(oldPrincipal.getName())) {
                     throw new ConfigException(String.format("Keystore 
DistinguishedName does not match: " +
                         " existing={alias=%s, DN=%s}, new={alias=%s, DN=%s}",
                         oldEntry.alias, oldEntry.subjectPrincipal, 
newEntry.alias, newEntry.subjectPrincipal));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 21dcd6e4b0f..7ac707b5de7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -20,7 +20,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.security.GeneralSecurityException;
+import java.security.KeyPair;
 import java.security.KeyStore;
+import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Map;
 
@@ -46,6 +48,7 @@ import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.kafka.common.security.ssl.SslFactory.CertificateEntries.ensureCompatible;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -521,6 +524,41 @@ public abstract class SslFactoryTest {
         
assertFalse(securityConfig.unused().contains(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG));
     }
 
+    @Test
+    public void testDynamicUpdateCompatibility() throws Exception {
+        KeyPair keyPair = TestSslUtils.generateKeyPair("RSA");
+        KeyStore ks = createKeyStore(keyPair, "*.example.com", "Kafka", true, 
"localhost", "*.example.com");
+        ensureCompatible(ks, ks);
+        ensureCompatible(ks, createKeyStore(keyPair, "*.example.com", "Kafka", 
true, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, " *.example.com", " Kafka 
", true, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.example.COM", "Kafka", 
true, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "KAFKA", 
true, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka", 
true, "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka", 
true, "localhost"));
+
+        ensureCompatible(ks, createKeyStore(keyPair, "*.example.com", "Kafka", 
false, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.example.COM", "Kafka", 
false, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "KAFKA", 
false, "localhost", "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka", 
false, "*.example.com"));
+        ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", "Kafka", 
false, "localhost"));
+
+        assertThrows(ConfigException.class, () ->
+                ensureCompatible(ks, createKeyStore(keyPair, " *.example.com", 
" Kafka ", false, "localhost", "*.example.com")));
+        assertThrows(ConfigException.class, () ->
+                ensureCompatible(ks, createKeyStore(keyPair, 
"*.another.example.com", "Kafka", true, "*.example.com")));
+        assertThrows(ConfigException.class, () ->
+                ensureCompatible(ks, createKeyStore(keyPair, "*.EXAMPLE.COM", 
"Kafka", true, "*.another.example.com")));
+    }
+
+    private KeyStore createKeyStore(KeyPair keyPair, String commonName, String 
org, boolean utf8, String... dnsNames) throws Exception {
+        X509Certificate cert = new 
TestSslUtils.CertificateBuilder().sanDnsNames(dnsNames)
+                .generate(commonName, org, utf8, keyPair);
+        KeyStore ks = KeyStore.getInstance("PKCS12");
+        ks.load(null, null);
+        ks.setKeyEntry("kafka", keyPair.getPrivate(), null, new 
X509Certificate[] {cert});
+        return ks;
+    }
+
     private KeyStore sslKeyStore(Map<String, Object> sslConfig) {
         SecurityStore store;
         if (sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) != null) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 6d006ea9c9e..675e6563f87 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -23,7 +23,12 @@ import 
org.apache.kafka.common.security.auth.SslEngineFactory;
 import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
 import org.bouncycastle.asn1.DEROctetString;
 import org.bouncycastle.asn1.DERSequence;
+import org.bouncycastle.asn1.DERT61String;
+import org.bouncycastle.asn1.DERUTF8String;
+import org.bouncycastle.asn1.x500.AttributeTypeAndValue;
+import org.bouncycastle.asn1.x500.RDN;
 import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
 import org.bouncycastle.asn1.x509.Extension;
 import org.bouncycastle.asn1.x509.GeneralName;
@@ -382,6 +387,17 @@ public class TestSslUtils {
         }
 
         public X509Certificate generate(String dn, KeyPair keyPair) throws 
CertificateException {
+            return generate(new X500Name(dn), keyPair);
+        }
+
+        public X509Certificate generate(String commonName, String org, boolean 
utf8, KeyPair keyPair) throws CertificateException {
+            RDN[] rdns = new RDN[2];
+            rdns[0] = new RDN(new AttributeTypeAndValue(BCStyle.CN, utf8 ? new 
DERUTF8String(commonName) : new DERT61String(commonName)));
+            rdns[1] = new RDN(new AttributeTypeAndValue(BCStyle.O, utf8 ? new 
DERUTF8String(org) : new DERT61String(org)));
+            return generate(new X500Name(rdns), keyPair);
+        }
+
+        public X509Certificate generate(X500Name dn, KeyPair keyPair) throws 
CertificateException {
             try {
                 Security.addProvider(new BouncyCastleProvider());
                 AlgorithmIdentifier sigAlgId = new 
DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
@@ -399,11 +415,10 @@ public class TestSslUtils {
                 else
                     throw new IllegalArgumentException("Unsupported algorithm 
" + keyAlgorithm);
                 ContentSigner sigGen = 
signerBuilder.build(privateKeyAsymKeyParam);
-                X500Name name = new X500Name(dn);
                 Date from = new Date();
                 Date to = new Date(from.getTime() + days * 86400000L);
                 BigInteger sn = new BigInteger(64, new SecureRandom());
-                X509v3CertificateBuilder v3CertGen = new 
X509v3CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
+                X509v3CertificateBuilder v3CertGen = new 
X509v3CertificateBuilder(dn, sn, from, to, dn, subPubKeyInfo);
 
                 if (subjectAltName != null)
                     v3CertGen.addExtension(Extension.subjectAlternativeName, 
false, subjectAltName);

Reply via email to