This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.19 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2b9b9ebb098035eac9ef88ddfd570cba0e79e2b1 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu Dec 1 08:15:20 2022 -0600 NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components This closes #6743 Signed-off-by: Paul Grey <gr...@apache.org> --- .../login/DelegatingLoginConfigProvider.java | 2 +- .../nifi/kafka/shared/property/SaslMechanism.java | 10 +++ .../StandardKafkaPropertyNameProvider.java | 16 +++-- .../provider/StandardKafkaPropertyProvider.java | 2 +- .../login/DelegatingLoginConfigProviderTest.java | 69 +++++++++++++++++++ .../StandardKafkaPropertyProviderTest.java | 80 ++++++++++++++++++++++ 6 files changed, 171 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java index c9b81bc595..2be8274606 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java @@ -47,7 +47,7 @@ public class DelegatingLoginConfigProvider implements LoginConfigProvider { @Override public String getConfiguration(final PropertyContext context) { final String saslMechanismProperty = context.getProperty(KafkaClientComponent.SASL_MECHANISM).getValue(); - final SaslMechanism saslMechanism = SaslMechanism.valueOf(saslMechanismProperty); + final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(saslMechanismProperty); final LoginConfigProvider loginConfigProvider = PROVIDERS.getOrDefault(saslMechanism, SCRAM_PROVIDER); return loginConfigProvider.getConfiguration(context); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java index 619daeb86a..a9da7714e3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java @@ -18,6 +18,9 @@ package org.apache.nifi.kafka.shared.property; import org.apache.nifi.components.DescribedValue; +import java.util.Arrays; +import java.util.Optional; + /** * Enumeration of supported Kafka SASL Mechanisms */ @@ -42,6 +45,13 @@ public enum SaslMechanism implements DescribedValue { this.description = description; } + public static SaslMechanism getSaslMechanism(final String value) { + final Optional<SaslMechanism> foundSaslMechanism = Arrays.stream(SaslMechanism.values()) + .filter(saslMechanism -> saslMechanism.getValue().equals(value)) + .findFirst(); + return foundSaslMechanism.orElseThrow(() -> new IllegalArgumentException(String.format("SaslMechanism value [%s] not found", value))); + } + @Override public String getValue() { return value; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java index 10979716a4..2083bfc7e7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java @@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.LinkedHashSet; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -58,9 +59,12 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi final Set<String> propertyNames = new LinkedHashSet<>(); for (final String propertyClassName : PROPERTY_CLASSES) { - final Class<?> propertyClass = getClass(propertyClassName); - final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass); - propertyNames.addAll(classPropertyNames); + final Optional<Class<?>> propertyClassFound = findClass(propertyClassName); + if (propertyClassFound.isPresent()) { + final Class<?> propertyClass = propertyClassFound.get(); + final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass); + propertyNames.addAll(classPropertyNames); + } } return propertyNames; @@ -93,11 +97,11 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi } } - private static Class<?> getClass(final String className) { + private static Optional<Class<?>> findClass(final String className) { try { - return Class.forName(className); + return Optional.of(Class.forName(className)); } catch (final ClassNotFoundException e) { - throw new IllegalStateException("Kafka Configuration Class not found", e); + return Optional.empty(); } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java index a9ae4d6320..fd06acb6d0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java @@ -83,7 +83,7 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider { final String loginConfig = LOGIN_CONFIG_PROVIDER.getConfiguration(context); properties.put(SASL_JAAS_CONFIG.getProperty(), loginConfig); - final SaslMechanism saslMechanism = SaslMechanism.valueOf(context.getProperty(SASL_MECHANISM).getValue()); + final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue()); if (SaslMechanism.GSSAPI == saslMechanism && isCustomKerberosLoginFound()) { properties.put(SASL_LOGIN_CLASS.getProperty(), SASL_GSSAPI_CUSTOM_LOGIN_CLASS); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java new file mode 100644 index 0000000000..fef038a0ee --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.kafka.shared.property.SaslMechanism; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DelegatingLoginConfigProviderTest { + + private static final String PLAIN_LOGIN_MODULE = "PlainLoginModule"; + + private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule"; + + DelegatingLoginConfigProvider provider; + + TestRunner runner; + + @BeforeEach + void setProvider() { + provider = new DelegatingLoginConfigProvider(); + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.setValidateExpressionUsage(false); + } + + @Test + void testGetConfigurationPlain() { + runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.PLAIN.getValue()); + final PropertyContext propertyContext = runner.getProcessContext(); + + final String configuration = provider.getConfiguration(propertyContext); + + assertNotNull(configuration); + assertTrue(configuration.contains(PLAIN_LOGIN_MODULE), "PLAIN configuration not found"); + } + + @Test + void testGetConfigurationScram() { + runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_512.getValue()); + final PropertyContext propertyContext = runner.getProcessContext(); + + final String configuration = provider.getConfiguration(propertyContext); + + assertNotNull(configuration); + assertTrue(configuration.contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found"); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java new file mode 100644 index 0000000000..bce3412186 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.property.provider; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.kafka.shared.property.KafkaClientProperty; +import org.apache.nifi.kafka.shared.property.SaslMechanism; +import org.apache.nifi.kafka.shared.property.SecurityProtocol; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StandardKafkaPropertyProviderTest { + + private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule"; + + StandardKafkaPropertyProvider provider; + + TestRunner runner; + + @BeforeEach + void setProvider() { + provider = new StandardKafkaPropertyProvider(String.class); + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.setValidateExpressionUsage(false); + } + + @Test + void testGetProperties() { + final SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + + runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name()); + final PropertyContext propertyContext = runner.getProcessContext(); + + final Map<String, Object> properties = provider.getProperties(propertyContext); + + assertEquals(securityProtocol.name(), properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName())); + } + + @Test + void testGetPropertiesSaslMechanismScram() { + final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + + runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name()); + runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_256.getValue()); + final PropertyContext propertyContext = runner.getProcessContext(); + + final Map<String, Object> properties = provider.getProperties(propertyContext); + + final Object securityProtocolProperty = properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName()); + assertEquals(securityProtocol.name(), securityProtocolProperty); + + final Object saslConfigProperty = properties.get(KafkaClientProperty.SASL_JAAS_CONFIG.getProperty()); + assertNotNull(saslConfigProperty, "SASL configuration not found"); + assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found"); + } +}