This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0fde8be072 NIFI-12952 Added Amazon MSK Connection Service and NAR
0fde8be072 is described below
commit 0fde8be07270e41433d07fa1e3f940b1a08674d9
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Apr 23 14:24:26 2025 -0500
NIFI-12952 Added Amazon MSK Connection Service and NAR
Signed-off-by: Pierre Villard <[email protected]>
This closes #9896.
---
nifi-assembly/pom.xml | 6 ++
.../nifi-kafka-service-api-nar/pom.xml | 2 +-
.../pom.xml | 10 ++--
.../pom.xml | 32 +++++++++--
.../service/aws/AmazonMSKConnectionService.java | 66 ++++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 15 +++++
.../aws/AmazonMSKConnectionServiceTest.java | 50 ++++++++++++++++
nifi-extension-bundles/nifi-kafka-bundle/pom.xml | 6 +-
8 files changed, 173 insertions(+), 14 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 2c8dfea1fb..7bd2db7ee4 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -309,6 +309,12 @@ language governing permissions and limitations under the
License. -->
<version>2.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kafka-service-aws-nar</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-nar</artifactId>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
index e184615d71..ea91dc2622 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
@@ -33,7 +33,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
+ <artifactId>nifi-standard-shared-nar</artifactId>
<version>2.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws-nar/pom.xml
similarity index 84%
copy from
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
copy to
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws-nar/pom.xml
index e184615d71..f03ef92b98 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws-nar/pom.xml
@@ -22,19 +22,19 @@
<version>2.4.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-kafka-service-api-nar</artifactId>
+ <artifactId>nifi-kafka-service-aws-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kafka-service-api</artifactId>
- <version>2.4.0-SNAPSHOT</version>
+ <artifactId>nifi-kafka-service-aws</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
- <version>2.4.0-SNAPSHOT</version>
+ <artifactId>nifi-kafka-service-api-nar</artifactId>
+ <version>${project.version}</version>
<type>nar</type>
</dependency>
</dependencies>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
similarity index 57%
copy from
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
copy to nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
index e184615d71..44756d9cd0 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api-nar/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
@@ -22,20 +22,40 @@
<version>2.4.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-kafka-service-api-nar</artifactId>
- <packaging>nar</packaging>
+ <artifactId>nifi-kafka-service-aws</artifactId>
+ <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-service-api</artifactId>
- <version>2.4.0-SNAPSHOT</version>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
+ <!-- Kafka Service implementation required for extending the
Controller Service class -->
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
- <version>2.4.0-SNAPSHOT</version>
- <type>nar</type>
+ <artifactId>nifi-kafka-3-service</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- Controller Service API modules required for extending
Kafka3ConnectionService -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.msk</groupId>
+ <artifactId>aws-msk-iam-auth</artifactId>
+ <version>2.3.2</version>
</dependency>
</dependencies>
</project>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
new file mode 100644
index 0000000000..76a6b56d07
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.kafka.service.Kafka3ConnectionService;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+@Tags({"AWS", "MSK", "streaming", "kafka"})
+@CapabilityDescription("Provides and manages connections to AWS MSK Kafka
Brokers for producer or consumer operations.")
+public class AmazonMSKConnectionService extends Kafka3ConnectionService {
+
+ public static final PropertyDescriptor AWS_SASL_MECHANISM = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(SASL_MECHANISM)
+ .allowableValues(
+ SaslMechanism.AWS_MSK_IAM,
+ SaslMechanism.SCRAM_SHA_512
+ )
+ .defaultValue(SaslMechanism.AWS_MSK_IAM)
+ .build();
+
+ private final List<PropertyDescriptor> supportedPropertyDescriptors;
+
+ public AmazonMSKConnectionService() {
+ final List<PropertyDescriptor> propertyDescriptors = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+
+ final ListIterator<PropertyDescriptor> descriptors =
propertyDescriptors.listIterator();
+ while (descriptors.hasNext()) {
+ final PropertyDescriptor propertyDescriptor = descriptors.next();
+ if (SASL_MECHANISM.equals(propertyDescriptor)) {
+ descriptors.remove();
+ // Add AWS MSK properties
+ descriptors.add(AWS_SASL_MECHANISM);
+ descriptors.add(KafkaClientComponent.AWS_PROFILE_NAME);
+ }
+ }
+
+ supportedPropertyDescriptors = propertyDescriptors;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return supportedPropertyDescriptors;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..5dd6910c84
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.kafka.service.aws.AmazonMSKConnectionService
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
new file mode 100644
index 0000000000..c3dfc79e8b
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class AmazonMSKConnectionServiceTest {
+
+ private static final String SERVICE_ID =
AmazonMSKConnectionService.class.getName();
+
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setRunner() {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ }
+
+ @Test
+ void testValidProperties() throws InitializationException {
+ final AmazonMSKConnectionService service = new
AmazonMSKConnectionService();
+ runner.addControllerService(SERVICE_ID, service);
+
+ runner.setProperty(service,
AmazonMSKConnectionService.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVERS);
+
+ runner.assertValid(service);
+
+ runner.enableControllerService(service);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-kafka-bundle/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/pom.xml
index cee859c1c3..9a0085dc89 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/pom.xml
@@ -16,9 +16,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-bom</artifactId>
+ <artifactId>nifi-standard-shared-bom</artifactId>
<version>2.4.0-SNAPSHOT</version>
- <relativePath>../nifi-standard-services-api-bom</relativePath>
+
<relativePath>../nifi-standard-shared-bundle/nifi-standard-shared-bom</relativePath>
</parent>
<artifactId>nifi-kafka-bundle</artifactId>
<packaging>pom</packaging>
@@ -37,6 +37,8 @@
<module>nifi-kafka-processors</module>
<module>nifi-kafka-service-api-nar</module>
<module>nifi-kafka-service-api</module>
+ <module>nifi-kafka-service-aws</module>
+ <module>nifi-kafka-service-aws-nar</module>
<module>nifi-kafka-shared</module>
</modules>