exceptionfactory commented on code in PR #6846:
URL: https://github.com/apache/nifi/pull/6846#discussion_r1071357824


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java:
##########
@@ -107,6 +107,35 @@ public interface KafkaClientComponent {
             )
             .build();
 
+    PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
+            .name("aws.profile.name")
+            .displayName("AWS Profile Name")
+            .description("The AWS Profile to consider when there are multiple 
profiles available.")

Review Comment:
   ```suggestion
               .description("The Amazon Web Services Profile to select when 
multiple profiles are available.")
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java:
##########
@@ -107,6 +107,35 @@ public interface KafkaClientComponent {
             )
             .build();
 
+    PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
+            .name("aws.profile.name")
+            .displayName("AWS Profile Name")
+            .description("The AWS Profile to consider when there are multiple 
profiles available.")
+            .dependsOn(
+                    SASL_MECHANISM,
+                    SaslMechanism.AWS_MSK_IAM
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    PropertyDescriptor AWS_DEBUG_CREDS = new PropertyDescriptor.Builder()

Review Comment:
   Recommend removing this property descriptor and associated references since 
it is not necessary or recommend for standard operations.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java:
##########
@@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> 
properties, final String prop
         }
     }
 
-    private boolean isCustomKerberosLoginFound() {
+    private static boolean isCustomKerberosLoginFound() {
+        return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+    }
+
+    public static boolean isIAMCallbackHandlerFound() {
+        return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+    }
+
+    private static boolean isClassFound(String clazz) {

Review Comment:
   ```suggestion
       private static boolean isClassFound(final String className) {
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java:
##########
@@ -107,6 +107,35 @@ public interface KafkaClientComponent {
             )
             .build();
 
+    PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
+            .name("aws.profile.name")
+            .displayName("AWS Profile Name")
+            .description("The AWS Profile to consider when there are multiple 
profiles available.")
+            .dependsOn(
+                    SASL_MECHANISM,
+                    SaslMechanism.AWS_MSK_IAM
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)

Review Comment:
   Recommend removing support for Variable Registry expressions because this 
feature is subject to removal and Parameters should be used instead.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java:
##########
@@ -233,6 +235,19 @@ private void validateUsernamePassword(final 
ValidationContext validationContext,
         }
     }
 
+    private void validateAWSIAMMechanism(final ValidationContext 
validationContext, final Collection<ValidationResult> results) {
+        final String saslMechanism = 
validationContext.getProperty(SASL_MECHANISM).getValue();
+
+        if (SaslMechanism.AWS_MSK_IAM.getValue().equals(saslMechanism) && 
!StandardKafkaPropertyProvider.isIAMCallbackHandlerFound()) {
+            final String explanation = String.format("[%s] should be on 
classpath", 
StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);

Review Comment:
   ```suggestion
               final String explanation = String.format("[%s] required class 
not found: Kafka modules must be compiled with AWS MSK enabled", 
StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.util.StringUtils;
+
+/**
+ * SASL AWS IAM Login Module implementation of configuration provider
+ */
+public class IAMLoginConfigProvider implements LoginConfigProvider {

Review Comment:
   Recommend adjusting the class name to be more specific to AWS MSK:
   
   ```suggestion
   public class AwsMskIAMLoginConfigProvider implements LoginConfigProvider {
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.util.StringUtils;
+
+/**
+ * SASL AWS IAM Login Module implementation of configuration provider
+ */
+public class IAMLoginConfigProvider implements LoginConfigProvider {
+
+    private static final String MODULE_CLASS = 
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+    private static final String AWS_PROFILE_NAME_FORMAT = 
"awsProfileName=\"%s\"";
+
+    private static final String AWS_ENABLE_DEBUG_CREDS = "awsDebugCreds=true";
+
+    @Override
+    public String getConfiguration(PropertyContext context) {
+        final String awsProfileName = 
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
+        final boolean awsDebugCreds = 
context.getProperty(KafkaClientComponent.AWS_DEBUG_CREDS).asBoolean();
+
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS);
+
+        if (!StringUtils.isBlank(awsProfileName)) {
+            builder.append(String.format(AWS_PROFILE_NAME_FORMAT, 
awsProfileName));
+        }
+
+        if (awsDebugCreds) {
+            builder.append(AWS_ENABLE_DEBUG_CREDS);
+        }

Review Comment:
   As mentioned on the property descriptor, recommend removing this setting.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java:
##########
@@ -31,7 +33,10 @@ public enum SaslMechanism implements DescribedValue {
 
     SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response 
Authentication Mechanism using SHA-512 with username and password"),
 
-    SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response 
Authentication Mechanism using SHA-256 with username and password");
+    SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response 
Authentication Mechanism using SHA-256 with username and password"),
+
+    AWS_MSK_IAM("AWS_MSK_IAM", "AWS IAM", "Allows to use AWS IAM for 
authentication and authorization against Amazon MSK clusters that have AWS IAM 
enabled " +

Review Comment:
   Recommend using the same value for both the name and value.
   ```suggestion
       AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for 
authentication and authorization against Amazon MSK clusters that have AWS IAM 
enabled " +
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.util.StringUtils;
+
+/**
+ * SASL AWS IAM Login Module implementation of configuration provider
+ */
+public class IAMLoginConfigProvider implements LoginConfigProvider {
+
+    private static final String MODULE_CLASS = 
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+    private static final String AWS_PROFILE_NAME_FORMAT = 
"awsProfileName=\"%s\"";
+
+    private static final String AWS_ENABLE_DEBUG_CREDS = "awsDebugCreds=true";
+
+    @Override
+    public String getConfiguration(PropertyContext context) {
+        final String awsProfileName = 
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
+        final boolean awsDebugCreds = 
context.getProperty(KafkaClientComponent.AWS_DEBUG_CREDS).asBoolean();
+
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS);
+
+        if (!StringUtils.isBlank(awsProfileName)) {

Review Comment:
   ```suggestion
           if (StringUtils.isNotBlank(awsProfileName)) {
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java:
##########
@@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> 
properties, final String prop
         }
     }
 
-    private boolean isCustomKerberosLoginFound() {
+    private static boolean isCustomKerberosLoginFound() {
+        return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+    }
+
+    public static boolean isIAMCallbackHandlerFound() {

Review Comment:
   Recommend adjusting the method name to be more specific:
   ```suggestion
       public static boolean isAwsMskIAMCallbackHandlerFound() {
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java:
##########
@@ -160,9 +165,17 @@ private void setProperty(final Map<String, Object> 
properties, final String prop
         }
     }
 
-    private boolean isCustomKerberosLoginFound() {
+    private static boolean isCustomKerberosLoginFound() {
+        return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+    }
+
+    public static boolean isIAMCallbackHandlerFound() {
+        return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+    }
+
+    private static boolean isClassFound(String clazz) {
         try {
-            Class.forName(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+            Class.forName(clazz);

Review Comment:
   ```suggestion
               Class.forName(className);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to