exceptionfactory commented on code in PR #6846: URL: https://github.com/apache/nifi/pull/6846#discussion_r1071357824
########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ########## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); + PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() + .name("aws.profile.name") + .displayName("AWS Profile Name") + .description("The AWS Profile to consider when there are multiple profiles available.") Review Comment: ```suggestion .description("The Amazon Web Services Profile to select when multiple profiles are available.") ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ########## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); + PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() + .name("aws.profile.name") + .displayName("AWS Profile Name") + .description("The AWS Profile to consider when there are multiple profiles available.") + .dependsOn( + SASL_MECHANISM, + SaslMechanism.AWS_MSK_IAM + ) + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + PropertyDescriptor AWS_DEBUG_CREDS = new PropertyDescriptor.Builder() Review Comment: Recommend removing this property descriptor and associated references since it is not necessary or recommend for standard operations. ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java: ########## @@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> properties, final String prop } } - private boolean isCustomKerberosLoginFound() { + private static boolean isCustomKerberosLoginFound() { + return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + } + + public static boolean isIAMCallbackHandlerFound() { + return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); + } + + private static boolean isClassFound(String clazz) { Review Comment: ```suggestion private static boolean isClassFound(final String className) { ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ########## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); + PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() + .name("aws.profile.name") + .displayName("AWS Profile Name") + .description("The AWS Profile to consider when there are multiple profiles available.") + .dependsOn( + SASL_MECHANISM, + SaslMechanism.AWS_MSK_IAM + ) + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) Review Comment: Recommend removing support for Variable Registry expressions because this feature is subject to removal and Parameters should be used instead. ```suggestion ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java: ########## @@ -233,6 +235,19 @@ private void validateUsernamePassword(final ValidationContext validationContext, } } + private void validateAWSIAMMechanism(final ValidationContext validationContext, final Collection<ValidationResult> results) { + final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue(); + + if (SaslMechanism.AWS_MSK_IAM.getValue().equals(saslMechanism) && !StandardKafkaPropertyProvider.isIAMCallbackHandlerFound()) { + final String explanation = String.format("[%s] should be on classpath", StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); Review Comment: ```suggestion final String explanation = String.format("[%s] required class not found: Kafka modules must be compiled with AWS MSK enabled", StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.util.StringUtils; + +/** + * SASL AWS IAM Login Module implementation of configuration provider + */ +public class IAMLoginConfigProvider implements LoginConfigProvider { Review Comment: Recommend adjusting the class name to be more specific to AWS MSK: ```suggestion public class AwsMskIAMLoginConfigProvider implements LoginConfigProvider { ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.util.StringUtils; + +/** + * SASL AWS IAM Login Module implementation of configuration provider + */ +public class IAMLoginConfigProvider implements LoginConfigProvider { + + private static final String MODULE_CLASS = "software.amazon.msk.auth.iam.IAMLoginModule"; + + private static final String AWS_PROFILE_NAME_FORMAT = "awsProfileName=\"%s\""; + + private static final String AWS_ENABLE_DEBUG_CREDS = "awsDebugCreds=true"; + + @Override + public String getConfiguration(PropertyContext context) { + final String awsProfileName = context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue(); + final boolean awsDebugCreds = context.getProperty(KafkaClientComponent.AWS_DEBUG_CREDS).asBoolean(); + + final LoginConfigBuilder builder = new LoginConfigBuilder(MODULE_CLASS); + + if (!StringUtils.isBlank(awsProfileName)) { + builder.append(String.format(AWS_PROFILE_NAME_FORMAT, awsProfileName)); + } + + if (awsDebugCreds) { + builder.append(AWS_ENABLE_DEBUG_CREDS); + } Review Comment: As mentioned on the property descriptor, recommend removing this setting. ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java: ########## @@ -31,7 +33,10 @@ public enum SaslMechanism implements DescribedValue { SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response Authentication Mechanism using SHA-512 with username and password"), - SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response Authentication Mechanism using SHA-256 with username and password"); + SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response Authentication Mechanism using SHA-256 with username and password"), + + AWS_MSK_IAM("AWS_MSK_IAM", "AWS IAM", "Allows to use AWS IAM for authentication and authorization against Amazon MSK clusters that have AWS IAM enabled " + Review Comment: Recommend using the same value for both the name and value. ```suggestion AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for authentication and authorization against Amazon MSK clusters that have AWS IAM enabled " + ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.util.StringUtils; + +/** + * SASL AWS IAM Login Module implementation of configuration provider + */ +public class IAMLoginConfigProvider implements LoginConfigProvider { + + private static final String MODULE_CLASS = "software.amazon.msk.auth.iam.IAMLoginModule"; + + private static final String AWS_PROFILE_NAME_FORMAT = "awsProfileName=\"%s\""; + + private static final String AWS_ENABLE_DEBUG_CREDS = "awsDebugCreds=true"; + + @Override + public String getConfiguration(PropertyContext context) { + final String awsProfileName = context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue(); + final boolean awsDebugCreds = context.getProperty(KafkaClientComponent.AWS_DEBUG_CREDS).asBoolean(); + + final LoginConfigBuilder builder = new LoginConfigBuilder(MODULE_CLASS); + + if (!StringUtils.isBlank(awsProfileName)) { Review Comment: ```suggestion if (StringUtils.isNotBlank(awsProfileName)) { ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java: ########## @@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> properties, final String prop } } - private boolean isCustomKerberosLoginFound() { + private static boolean isCustomKerberosLoginFound() { + return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + } + + public static boolean isIAMCallbackHandlerFound() { Review Comment: Recommend adjusting the method name to be more specific: ```suggestion public static boolean isAwsMskIAMCallbackHandlerFound() { ``` ########## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java: ########## @@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> properties, final String prop } } - private boolean isCustomKerberosLoginFound() { + private static boolean isCustomKerberosLoginFound() { + return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + } + + public static boolean isIAMCallbackHandlerFound() { + return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); + } + + private static boolean isClassFound(String clazz) { try { - Class.forName(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + Class.forName(clazz); Review Comment: ```suggestion Class.forName(className); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org