This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9ccef4cfb7 NIFI-14927 Added AWS Role Properties to
AmazonMSKConnectionService (#10278)
9ccef4cfb7 is described below
commit 9ccef4cfb720a351f5d259a120d7c14ab9b62e78
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 9 14:44:38 2025 +0200
NIFI-14927 Added AWS Role Properties to AmazonMSKConnectionService (#10278)
Signed-off-by: David Handermann <[email protected]>
---
.../service/aws/AmazonMSKConnectionService.java | 14 +++++
.../shared/component/KafkaClientComponent.java | 46 ++++++++++++--
.../shared/login/AwsMskIamLoginConfigProvider.java | 21 +++++--
.../nifi/kafka/shared/property/AwsRoleSource.java | 51 +++++++++++++++
.../login/AwsMskIamLoginConfigProviderTest.java | 73 ++++++++++++++++++++++
5 files changed, 197 insertions(+), 8 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
index 76a6b56d07..32236d120a 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
@@ -19,8 +19,10 @@ package org.apache.nifi.kafka.service.aws;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.kafka.service.Kafka3ConnectionService;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import java.util.ArrayList;
@@ -52,7 +54,10 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
descriptors.remove();
// Add AWS MSK properties
descriptors.add(AWS_SASL_MECHANISM);
+ descriptors.add(KafkaClientComponent.AWS_ROLE_SOURCE);
descriptors.add(KafkaClientComponent.AWS_PROFILE_NAME);
+ descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_ARN);
+
descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME);
}
}
@@ -63,4 +68,13 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return supportedPropertyDescriptors;
}
+
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ // For backward compatibility: if an AWS Profile Name was configured
previously,
+ // set AWS Role Source to SPECIFIED_PROFILE
+ if (config.isPropertySet(KafkaClientComponent.AWS_PROFILE_NAME)) {
+ config.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.SPECIFIED_PROFILE.name());
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 771805870f..5d88cf108e 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kafka.shared.component;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
@@ -107,17 +108,54 @@ public interface KafkaClientComponent {
)
.build();
+ PropertyDescriptor AWS_ROLE_SOURCE = new PropertyDescriptor.Builder()
+ .name("AWS Role Source")
+ .description("Select how AWS credentials are sourced for AWS MSK
IAM: Default Profile searches standard locations," +
+ " Specified Profile selects a named profile, or Specified
Role configures a Role ARN and Session Name.")
+ .required(true)
+ .allowableValues(AwsRoleSource.class)
+ .defaultValue(AwsRoleSource.DEFAULT_PROFILE)
+ .dependsOn(
+ SASL_MECHANISM,
+ SaslMechanism.AWS_MSK_IAM
+ )
+ .build();
+
PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
.name("aws.profile.name")
.displayName("AWS Profile Name")
.description("The Amazon Web Services Profile to select when
multiple profiles are available.")
.dependsOn(
- SASL_MECHANISM,
- SaslMechanism.AWS_MSK_IAM
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.SPECIFIED_PROFILE
+ )
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ PropertyDescriptor AWS_ASSUME_ROLE_ARN = new PropertyDescriptor.Builder()
+ .name("AWS Assume Role ARN")
+ .description("The AWS Role ARN for cross-account access when using
AWS MSK IAM. Used with Assume Role Session Name.")
+ .required(true)
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.SPECIFIED_ROLE
)
- .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ PropertyDescriptor AWS_ASSUME_ROLE_SESSION_NAME = new
PropertyDescriptor.Builder()
+ .name("AWS Assume Role Session Name")
+ .description("The AWS Role Session Name for cross-account access.
Used in conjunction with Assume Role ARN.")
+ .required(true)
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.SPECIFIED_ROLE
+ )
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
index a5adfc12d1..a6e02a6d48 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kafka.shared.login;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
import org.apache.nifi.util.StringUtils;
import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
@@ -30,15 +31,27 @@ public class AwsMskIamLoginConfigProvider implements
LoginConfigProvider {
private static final String MODULE_CLASS =
"software.amazon.msk.auth.iam.IAMLoginModule";
private static final String AWS_PROFILE_NAME_KEY = "awsProfileName";
+ private static final String ROLE_ARN_KEY = "awsRoleArn";
+ private static final String ROLE_SESSION_NAME_KEY = "awsRoleSessionName";
@Override
public String getConfiguration(PropertyContext context) {
- final String awsProfileName =
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
-
+ final AwsRoleSource roleSource =
context.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
final LoginConfigBuilder builder = new
LoginConfigBuilder(MODULE_CLASS, REQUIRED);
- if (StringUtils.isNotBlank(awsProfileName)) {
- builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+ if (roleSource == AwsRoleSource.SPECIFIED_PROFILE) {
+ final String awsProfileName =
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).getValue();
+ if (!StringUtils.isBlank(awsProfileName)) {
+ builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+ }
+ }
+
+ if (roleSource == AwsRoleSource.SPECIFIED_ROLE) {
+ final String assumeRoleArn =
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+ final String assumeRoleSessionName =
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+
+ builder.append(ROLE_ARN_KEY, assumeRoleArn);
+ builder.append(ROLE_SESSION_NAME_KEY, assumeRoleSessionName);
}
return builder.build();
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
new file mode 100644
index 0000000000..669723addd
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * AWS Role Source strategy for AWS MSK IAM credentials selection
+ */
+public enum AwsRoleSource implements DescribedValue {
+ DEFAULT_PROFILE("Default Profile", "Use the default AWS credentials
provider chain to locate credentials."),
+ SPECIFIED_PROFILE("Specified Profile", "Use the configured AWS Profile
Name from the default credentials file."),
+ SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the
configured Role ARN and Session Name.");
+
+ private final String displayName;
+ private final String description;
+
+ AwsRoleSource(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
new file mode 100644
index 0000000000..6e49970a9f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AwsMskIamLoginConfigProviderTest {
+
+ private static final String IAM_LOGIN_MODULE =
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+ private TestRunner runner;
+ private AwsMskIamLoginConfigProvider provider;
+
+ @BeforeEach
+ void setUp() {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ provider = new AwsMskIamLoginConfigProvider();
+ }
+
+ @Test
+ void testConfigurationWithSpecifiedProfile() {
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM,
SaslMechanism.AWS_MSK_IAM);
+ runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE,
"SPECIFIED_PROFILE");
+ runner.setProperty(KafkaClientComponent.AWS_PROFILE_NAME, "myProfile");
+
+ final PropertyContext context = runner.getProcessContext();
+ final String configuration = provider.getConfiguration(context);
+
+ assertNotNull(configuration);
+ assertTrue(configuration.contains(IAM_LOGIN_MODULE), "IAM Login Module
not present");
+ assertTrue(configuration.contains("awsProfileName=\"myProfile\""),
"awsProfileName JAAS option not present");
+ }
+
+ @Test
+ void testConfigurationWithSpecifiedRole() {
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM,
SaslMechanism.AWS_MSK_IAM);
+ runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE,
"SPECIFIED_ROLE");
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/MyRole");
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME,
"MySession");
+
+ final PropertyContext context = runner.getProcessContext();
+ final String configuration = provider.getConfiguration(context);
+
+ assertNotNull(configuration);
+ assertTrue(configuration.contains(IAM_LOGIN_MODULE), "IAM Login Module
not present");
+
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/MyRole\""),
"awsRoleArn JAAS option not present");
+ assertTrue(configuration.contains("awsRoleSessionName=\"MySession\""),
"awsRoleSessionName JAAS option not present");
+ }
+}