This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ccef4cfb7 NIFI-14927 Added AWS Role Properties to 
AmazonMSKConnectionService (#10278)
9ccef4cfb7 is described below

commit 9ccef4cfb720a351f5d259a120d7c14ab9b62e78
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 9 14:44:38 2025 +0200

    NIFI-14927 Added AWS Role Properties to AmazonMSKConnectionService (#10278)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../service/aws/AmazonMSKConnectionService.java    | 14 +++++
 .../shared/component/KafkaClientComponent.java     | 46 ++++++++++++--
 .../shared/login/AwsMskIamLoginConfigProvider.java | 21 +++++--
 .../nifi/kafka/shared/property/AwsRoleSource.java  | 51 +++++++++++++++
 .../login/AwsMskIamLoginConfigProviderTest.java    | 73 ++++++++++++++++++++++
 5 files changed, 197 insertions(+), 8 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
index 76a6b56d07..32236d120a 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
@@ -19,8 +19,10 @@ package org.apache.nifi.kafka.service.aws;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.kafka.service.Kafka3ConnectionService;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 
 import java.util.ArrayList;
@@ -52,7 +54,10 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
                 descriptors.remove();
                 // Add AWS MSK properties
                 descriptors.add(AWS_SASL_MECHANISM);
+                descriptors.add(KafkaClientComponent.AWS_ROLE_SOURCE);
                 descriptors.add(KafkaClientComponent.AWS_PROFILE_NAME);
+                descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_ARN);
+                
descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME);
             }
         }
 
@@ -63,4 +68,13 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return supportedPropertyDescriptors;
     }
+
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        // For backward compatibility: if an AWS Profile Name was configured 
previously,
+        // set AWS Role Source to SPECIFIED_PROFILE
+        if (config.isPropertySet(KafkaClientComponent.AWS_PROFILE_NAME)) {
+            config.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.SPECIFIED_PROFILE.name());
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 771805870f..5d88cf108e 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kafka.shared.component;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
@@ -107,17 +108,54 @@ public interface KafkaClientComponent {
             )
             .build();
 
+    PropertyDescriptor AWS_ROLE_SOURCE = new PropertyDescriptor.Builder()
+            .name("AWS Role Source")
+            .description("Select how AWS credentials are sourced for AWS MSK 
IAM: Default Profile searches standard locations," +
+                    " Specified Profile selects a named profile, or Specified 
Role configures a Role ARN and Session Name.")
+            .required(true)
+            .allowableValues(AwsRoleSource.class)
+            .defaultValue(AwsRoleSource.DEFAULT_PROFILE)
+            .dependsOn(
+                    SASL_MECHANISM,
+                    SaslMechanism.AWS_MSK_IAM
+            )
+            .build();
+
     PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
             .name("aws.profile.name")
             .displayName("AWS Profile Name")
             .description("The Amazon Web Services Profile to select when 
multiple profiles are available.")
             .dependsOn(
-                    SASL_MECHANISM,
-                    SaslMechanism.AWS_MSK_IAM
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.SPECIFIED_PROFILE
+            )
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    PropertyDescriptor AWS_ASSUME_ROLE_ARN = new PropertyDescriptor.Builder()
+            .name("AWS Assume Role ARN")
+            .description("The AWS Role ARN for cross-account access when using 
AWS MSK IAM. Used with Assume Role Session Name.")
+            .required(true)
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.SPECIFIED_ROLE
             )
-            .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    PropertyDescriptor AWS_ASSUME_ROLE_SESSION_NAME = new 
PropertyDescriptor.Builder()
+            .name("AWS Assume Role Session Name")
+            .description("The AWS Role Session Name for cross-account access. 
Used in conjunction with Assume Role ARN.")
+            .required(true)
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.SPECIFIED_ROLE
+            )
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
     PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
index a5adfc12d1..a6e02a6d48 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kafka.shared.login;
 
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
 import org.apache.nifi.util.StringUtils;
 
 import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
@@ -30,15 +31,27 @@ public class AwsMskIamLoginConfigProvider implements 
LoginConfigProvider {
     private static final String MODULE_CLASS = 
"software.amazon.msk.auth.iam.IAMLoginModule";
 
     private static final String AWS_PROFILE_NAME_KEY = "awsProfileName";
+    private static final String ROLE_ARN_KEY = "awsRoleArn";
+    private static final String ROLE_SESSION_NAME_KEY = "awsRoleSessionName";
 
     @Override
     public String getConfiguration(PropertyContext context) {
-        final String awsProfileName = 
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
-
+        final AwsRoleSource roleSource = 
context.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
         final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS, REQUIRED);
 
-        if (StringUtils.isNotBlank(awsProfileName)) {
-            builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+        if (roleSource == AwsRoleSource.SPECIFIED_PROFILE) {
+            final String awsProfileName = 
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).getValue();
+            if (!StringUtils.isBlank(awsProfileName)) {
+                builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+            }
+        }
+
+        if (roleSource == AwsRoleSource.SPECIFIED_ROLE) {
+            final String assumeRoleArn = 
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+            final String assumeRoleSessionName = 
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+
+            builder.append(ROLE_ARN_KEY, assumeRoleArn);
+            builder.append(ROLE_SESSION_NAME_KEY, assumeRoleSessionName);
         }
 
         return builder.build();
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
new file mode 100644
index 0000000000..669723addd
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * AWS Role Source strategy for AWS MSK IAM credentials selection
+ */
+public enum AwsRoleSource implements DescribedValue {
+    DEFAULT_PROFILE("Default Profile", "Use the default AWS credentials 
provider chain to locate credentials."),
+    SPECIFIED_PROFILE("Specified Profile", "Use the configured AWS Profile 
Name from the default credentials file."),
+    SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the 
configured Role ARN and Session Name.");
+
+    private final String displayName;
+    private final String description;
+
+    AwsRoleSource(final String displayName, final String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
new file mode 100644
index 0000000000..6e49970a9f
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AwsMskIamLoginConfigProviderTest {
+
+    private static final String IAM_LOGIN_MODULE = 
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+    private TestRunner runner;
+    private AwsMskIamLoginConfigProvider provider;
+
+    @BeforeEach
+    void setUp() {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        provider = new AwsMskIamLoginConfigProvider();
+    }
+
+    @Test
+    void testConfigurationWithSpecifiedProfile() {
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, 
SaslMechanism.AWS_MSK_IAM);
+        runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE, 
"SPECIFIED_PROFILE");
+        runner.setProperty(KafkaClientComponent.AWS_PROFILE_NAME, "myProfile");
+
+        final PropertyContext context = runner.getProcessContext();
+        final String configuration = provider.getConfiguration(context);
+
+        assertNotNull(configuration);
+        assertTrue(configuration.contains(IAM_LOGIN_MODULE), "IAM Login Module 
not present");
+        assertTrue(configuration.contains("awsProfileName=\"myProfile\""), 
"awsProfileName JAAS option not present");
+    }
+
+    @Test
+    void testConfigurationWithSpecifiedRole() {
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, 
SaslMechanism.AWS_MSK_IAM);
+        runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE, 
"SPECIFIED_ROLE");
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/MyRole");
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, 
"MySession");
+
+        final PropertyContext context = runner.getProcessContext();
+        final String configuration = provider.getConfiguration(context);
+
+        assertNotNull(configuration);
+        assertTrue(configuration.contains(IAM_LOGIN_MODULE), "IAM Login Module 
not present");
+        
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/MyRole\""),
 "awsRoleArn JAAS option not present");
+        assertTrue(configuration.contains("awsRoleSessionName=\"MySession\""), 
"awsRoleSessionName JAAS option not present");
+    }
+}

Reply via email to