This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eb5d172693 NIFI-10991 Add AWS MSK IAM support to Kafka processors
eb5d172693 is described below

commit eb5d172693fcaa24804d9e5d7ab3fd77193efc14
Author: Nandor Soma Abonyi <abonyis...@gmail.com>
AuthorDate: Wed Dec 21 13:39:52 2022 +0100

    NIFI-10991 Add AWS MSK IAM support to Kafka processors
    
    This closes #6846.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 nifi-assembly/NOTICE                               |  44 +++
 .../src/main/resources/META-INF/NOTICE             | 328 ++++++++++++++++++++-
 .../nifi-kafka-2-0-processors/pom.xml              |  13 +
 .../kafka/pubsub/ConsumeKafkaRecord_2_0.java       |   1 +
 .../processors/kafka/pubsub/ConsumeKafka_2_0.java  |   1 +
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       |   1 +
 .../processors/kafka/pubsub/PublishKafka_2_0.java  |   1 +
 .../src/main/resources/META-INF/NOTICE             | 328 ++++++++++++++++++++-
 .../nifi-kafka-2-6-processors/pom.xml              |  13 +
 .../kafka/pubsub/ConsumeKafkaRecord_2_6.java       |   1 +
 .../processors/kafka/pubsub/ConsumeKafka_2_6.java  |   1 +
 .../kafka/pubsub/PublishKafkaRecord_2_6.java       |   1 +
 .../processors/kafka/pubsub/PublishKafka_2_6.java  |   1 +
 .../shared/component/KafkaClientComponent.java     |  15 +-
 .../shared/login/AwsMskIamLoginConfigProvider.java |  46 +++
 .../login/DelegatingLoginConfigProvider.java       |   1 +
 .../KerberosUserServiceLoginConfigProvider.java    |  49 +--
 .../kafka/shared/login/LoginConfigBuilder.java     |  74 +++++
 .../shared/login/ScramLoginConfigProvider.java     |  20 +-
 .../kafka/shared/property/KafkaClientProperty.java |   2 +
 .../nifi/kafka/shared/property/SaslMechanism.java  |  15 +-
 .../provider/StandardKafkaPropertyProvider.java    |  17 +-
 .../KafkaClientCustomValidationFunction.java       |  20 ++
 .../shared/login/LoginConfigBuilderTest.java}      |  41 +--
 nifi-nar-bundles/nifi-kafka-bundle/pom.xml         |   9 +-
 25 files changed, 950 insertions(+), 93 deletions(-)

diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index f3529d5320..2da146f69d 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -199,6 +199,18 @@ The following binary components are provided under the 
Apache Software License v
       This project contains annotations derived from JCIP-ANNOTATIONS
       Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
 
+      Apache HttpComponents Client
+      Copyright 1999-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+      Apache HttpComponents Core
+      Copyright 2005-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
   (ASLv2) Apache Jakarta HttpClient
     The following NOTICE information applies:
       Apache Jakarta HttpClient
@@ -1134,6 +1146,23 @@ The following binary components are provided under the 
Apache Software License v
       Since product implements StAX API, it has dependencies to StAX API
       classes.
 
+  (ASLv2) AWS SDK for Java
+    The following NOTICE information applies:
+      AWS SDK for Java
+      Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+      This product includes software developed by
+      Amazon Technologies, Inc (http://www.amazon.com/).
+
+      **********************
+      THIRD PARTY COMPONENTS
+      **********************
+      This software includes third party software subject to the following 
copyrights:
+      - XML parsing and utility functions from JetS3t - Copyright 2006-2009 
James Murty.
+      - PKCS#1 PEM encoded private key parsing and utility functions from 
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+      The licenses for these third party components are included in LICENSE.txt
+
   (ASLv2) AWS SDK for Java 2.0
       The following NOTICE information applies:
         Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
@@ -1155,6 +1184,16 @@ The following binary components are provided under the 
Apache Software License v
       - XML parsing and utility functions from JetS3t - Copyright 2006-2009 
James Murty.
       - PKCS#1 PEM encoded private key parsing and utility functions from 
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
 
+  (ASLv2) AWS EventStream for Java
+    The following NOTICE information applies:
+      AWS EventStream for Java
+      Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+  (ASLv2) Amazon Ion Java
+    The following NOTICE information applies:
+      Amazon Ion Java
+      Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
    (ASLv2) Apache Commons DBCP
       The following NOTICE information applies:
         Apache Commons DBCP
@@ -2392,6 +2431,11 @@ The following binary components are provided under the 
Apache Software License v
       The Box SDK for Java
       Copyright 2019 Box, Inc. All rights reserved.
 
+  (ASLv2) aws-msk-iam-auth
+    The following NOTICE information applies:
+      aws-msk-iam-auth
+      Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
 ************************
 Common Development and Distribution License 1.1
 ************************
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
index fc058deadf..5c63df88cb 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
@@ -1,5 +1,5 @@
-nifi-kafka-1-0-nar
-Copyright 2014-2022 The Apache Software Foundation
+nifi-kafka-2-0-nar
+Copyright 2014-2023 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
@@ -81,3 +81,327 @@ The following binary components are provided under the 
Apache Software License v
       in some artifacts (usually source distributions); but is always available
       from the source code management (SCM) system project uses.
 
+  (ASLv2) aws-msk-iam-auth
+    The following NOTICE information applies:
+      aws-msk-iam-auth
+      Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+  (ASLv2) AWS SDK for Java
+    The following NOTICE information applies:
+      AWS SDK for Java
+      Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+      This product includes software developed by
+      Amazon Technologies, Inc (http://www.amazon.com/).
+
+      **********************
+      THIRD PARTY COMPONENTS
+      **********************
+      This software includes third party software subject to the following 
copyrights:
+      - XML parsing and utility functions from JetS3t - Copyright 2006-2009 
James Murty.
+      - PKCS#1 PEM encoded private key parsing and utility functions from 
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+      The licenses for these third party components are included in LICENSE.txt
+
+  (ASLv2) AWS EventStream for Java
+    The following NOTICE information applies:
+      AWS EventStream for Java
+      Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+  (ASLv2) Apache HttpComponents
+    The following NOTICE information applies:
+      Apache HttpComponents Client
+      Copyright 1999-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+      Apache HttpComponents Core
+      Copyright 2005-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Amazon Ion Java
+    The following NOTICE information applies:
+      Amazon Ion Java
+      Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+  (ASLv2) Joda-Time
+    The following NOTICE information applies:
+      This product includes software developed by
+      Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+    The following NOTICE information applies:
+                                  The Netty Project
+                                  =================
+
+      Please visit the Netty web site for more information:
+
+        * https://netty.io/
+
+      Copyright 2014 The Netty Project
+
+      The Netty Project licenses this file to you under the Apache License,
+      version 2.0 (the "License"); you may not use this file except in 
compliance
+      with the License. You may obtain a copy of the License at:
+
+        https://www.apache.org/licenses/LICENSE-2.0
+
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+      WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+      License for the specific language governing permissions and limitations
+      under the License.
+
+      Also, please refer to each LICENSE.<component>.txt file, which is 
located in
+      the 'license' directory of the distribution file, for the license terms 
of the
+      components that this product depends on.
+
+      
-------------------------------------------------------------------------------
+      This product contains the extensions to Java Collections Framework which 
has
+      been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+        * LICENSE:
+          * license/LICENSE.jsr166y.txt (Public Domain)
+        * HOMEPAGE:
+          * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+          * 
http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+      This product contains a modified version of Robert Harder's Public Domain
+      Base64 Encoder and Decoder, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.base64.txt (Public Domain)
+        * HOMEPAGE:
+          * http://iharder.sourceforge.net/current/java/base64/
+
+      This product contains a modified portion of 'Webbit', an event based
+      WebSocket and HTTP server, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.webbit.txt (BSD License)
+        * HOMEPAGE:
+          * https://github.com/joewalnes/webbit
+
+      This product contains a modified portion of 'SLF4J', a simple logging
+      facade for Java, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.slf4j.txt (MIT License)
+        * HOMEPAGE:
+          * https://www.slf4j.org/
+
+      This product contains a modified portion of 'Apache Harmony', an open 
source
+      Java SE, which can be obtained at:
+
+        * NOTICE:
+          * license/NOTICE.harmony.txt
+        * LICENSE:
+          * license/LICENSE.harmony.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://archive.apache.org/dist/harmony/
+
+      This product contains a modified portion of 'jbzip2', a Java bzip2 
compression
+      and decompression library written by Matthew J. Francis. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jbzip2.txt (MIT License)
+        * HOMEPAGE:
+          * https://code.google.com/p/jbzip2/
+
+      This product contains a modified portion of 'libdivsufsort', a C API 
library to construct
+      the suffix array and the Burrows-Wheeler transformed string for any 
input string of
+      a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.libdivsufsort.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/y-256/libdivsufsort
+
+      This product contains a modified portion of Nitsan Wakart's 'JCTools', 
Java Concurrency Tools for the JVM,
+       which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jctools.txt (ASL2 License)
+        * HOMEPAGE:
+          * https://github.com/JCTools/JCTools
+
+      This product optionally depends on 'JZlib', a re-implementation of zlib 
in
+      pure Java, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jzlib.txt (BSD style License)
+        * HOMEPAGE:
+          * http://www.jcraft.com/jzlib/
+
+      This product optionally depends on 'Compress-LZF', a Java library for 
encoding and
+      decoding data in LZF format, written by Tatu Saloranta. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/ning/compress
+
+      This product optionally depends on 'lz4', a LZ4 Java compression
+      and decompression library written by Adrien Grand. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.lz4.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jpountz/lz4-java
+
+      This product optionally depends on 'lzma-java', a LZMA Java compression
+      and decompression library, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.lzma-java.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jponge/lzma-java
+
+      This product optionally depends on 'zstd-jni', a zstd-jni Java 
compression
+      and decompression library, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/luben/zstd-jni
+
+      This product contains a modified portion of 'jfastlz', a Java port of 
FastLZ compression
+      and decompression library written by William Kinney. It can be obtained 
at:
+
+        * LICENSE:
+          * license/LICENSE.jfastlz.txt (MIT License)
+        * HOMEPAGE:
+          * https://code.google.com/p/jfastlz/
+
+      This product contains a modified portion of and optionally depends on 
'Protocol Buffers', Google's data
+      interchange format, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.protobuf.txt (New BSD License)
+        * HOMEPAGE:
+          * https://github.com/google/protobuf
+
+      This product optionally depends on 'Bouncy Castle Crypto APIs' to 
generate
+      a temporary self-signed X.509 certificate when the JVM does not provide 
the
+      equivalent functionality.  It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.bouncycastle.txt (MIT License)
+        * HOMEPAGE:
+          * https://www.bouncycastle.org/
+
+      This product optionally depends on 'Snappy', a compression library 
produced
+      by Google Inc, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.snappy.txt (New BSD License)
+        * HOMEPAGE:
+          * https://github.com/google/snappy
+
+      This product optionally depends on 'JBoss Marshalling', an alternative 
Java
+      serialization API, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jboss-remoting/jboss-marshalling
+
+      This product optionally depends on 'Caliper', Google's micro-
+      benchmarking framework, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.caliper.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/google/caliper
+
+      This product optionally depends on 'Apache Commons Logging', a logging
+      framework, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.commons-logging.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://commons.apache.org/logging/
+
+      This product optionally depends on 'Apache Log4J', a logging framework, 
which
+      can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.log4j.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://logging.apache.org/log4j/
+
+      This product optionally depends on 'Aalto XML', an ultra-high performance
+      non-blocking XML processor, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://wiki.fasterxml.com/AaltoHome
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.hpack.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/twitter/hpack
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained 
at:
+
+        * LICENSE:
+          * license/LICENSE.hyper-hpack.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/python-hyper/hpack/
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.nghttp2-hpack.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/nghttp2/nghttp2/
+
+      This product contains a modified portion of 'Apache Commons Lang', a 
Java library
+      provides utilities for the java.lang API, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.commons-lang.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://commons.apache.org/proper/commons-lang/
+
+
+      This product contains the Maven wrapper scripts from 'Maven Wrapper', 
that provides an easy way to ensure a user has everything necessary to run the 
Maven build.
+
+        * LICENSE:
+          * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/takari/maven-wrapper
+
+      This product contains the dnsinfo.h header file, that provides a way to 
retrieve the system DNS configuration on MacOS.
+      This private header is also used by Apple's open source
+       mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+       * LICENSE:
+          * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+        * HOMEPAGE:
+          * 
https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+      This product optionally depends on 'Brotli4j', Brotli compression and
+      decompression for Java., which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.brotli4j.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/hyperxpro/Brotli4j
+
+************************
+Creative Commons Zero license version 1.0
+************************
+
+The following binary components are provided under the Creative Commons Zero 
license version 1.0.  See project link for details.
+
+    (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 
- http://www.reactive-streams.org/)
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
index 35351ad649..63f45a8d42 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
@@ -135,5 +135,18 @@
                 </dependency>
             </dependencies>
         </profile>
+        <profile>
+            <id>include-kafka-aws</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>software.amazon.msk</groupId>
+                    <artifactId>aws-msk-iam-auth</artifactId>
+                    <version>${aws-msk-iam-auth.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
     </profiles>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index e0e0cb5894..c8a9d7d7ec 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -266,6 +266,7 @@ public class ConsumeKafkaRecord_2_0 extends 
AbstractProcessor implements KafkaCl
         descriptors.add(SASL_USERNAME);
         descriptors.add(SASL_PASSWORD);
         descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(AWS_PROFILE_NAME);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
         descriptors.add(SEPARATE_BY_KEY);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index e28852cdc9..94bc7a7c11 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -249,6 +249,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor 
implements KafkaClientCo
         descriptors.add(SASL_USERNAME);
         descriptors.add(SASL_PASSWORD);
         descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(AWS_PROFILE_NAME);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(TOPICS);
         descriptors.add(TOPIC_TYPE);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index b4f398464c..340d308c04 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -307,6 +307,7 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor implements KafkaPu
         properties.add(SASL_USERNAME);
         properties.add(SASL_PASSWORD);
         properties.add(TOKEN_AUTHENTICATION);
+        properties.add(AWS_PROFILE_NAME);
         properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
         properties.add(MAX_REQUEST_SIZE);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index 9bc312fc9e..2c6bfe8b45 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -291,6 +291,7 @@ public class PublishKafka_2_0 extends AbstractProcessor 
implements KafkaPublishC
         properties.add(SASL_USERNAME);
         properties.add(SASL_PASSWORD);
         properties.add(TOKEN_AUTHENTICATION);
+        properties.add(AWS_PROFILE_NAME);
         properties.add(SSL_CONTEXT_SERVICE);
         properties.add(TOPIC);
         properties.add(DELIVERY_GUARANTEE);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
index 0df5bf90a5..08530c98b9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
@@ -1,5 +1,5 @@
-nifi-kafka-2-5-nar
-Copyright 2014-2022 The Apache Software Foundation
+nifi-kafka-2-6-nar
+Copyright 2014-2023 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
@@ -81,3 +81,327 @@ The following binary components are provided under the 
Apache Software License v
       in some artifacts (usually source distributions); but is always available
       from the source code management (SCM) system project uses.
 
+  (ASLv2) aws-msk-iam-auth
+    The following NOTICE information applies:
+      aws-msk-iam-auth
+      Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+  (ASLv2) AWS SDK for Java
+    The following NOTICE information applies:
+      AWS SDK for Java
+      Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+      This product includes software developed by
+      Amazon Technologies, Inc (http://www.amazon.com/).
+
+      **********************
+      THIRD PARTY COMPONENTS
+      **********************
+      This software includes third party software subject to the following 
copyrights:
+      - XML parsing and utility functions from JetS3t - Copyright 2006-2009 
James Murty.
+      - PKCS#1 PEM encoded private key parsing and utility functions from 
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+      The licenses for these third party components are included in LICENSE.txt
+
+  (ASLv2) AWS EventStream for Java
+    The following NOTICE information applies:
+      AWS EventStream for Java
+      Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+  (ASLv2) Apache HttpComponents
+    The following NOTICE information applies:
+      Apache HttpComponents Client
+      Copyright 1999-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+      Apache HttpComponents Core
+      Copyright 2005-2020 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Amazon Ion Java
+    The following NOTICE information applies:
+      Amazon Ion Java
+      Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+  (ASLv2) Joda-Time
+    The following NOTICE information applies:
+      This product includes software developed by
+      Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+    The following NOTICE information applies:
+                                  The Netty Project
+                                  =================
+
+      Please visit the Netty web site for more information:
+
+        * https://netty.io/
+
+      Copyright 2014 The Netty Project
+
+      The Netty Project licenses this file to you under the Apache License,
+      version 2.0 (the "License"); you may not use this file except in 
compliance
+      with the License. You may obtain a copy of the License at:
+
+        https://www.apache.org/licenses/LICENSE-2.0
+
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+      WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+      License for the specific language governing permissions and limitations
+      under the License.
+
+      Also, please refer to each LICENSE.<component>.txt file, which is 
located in
+      the 'license' directory of the distribution file, for the license terms 
of the
+      components that this product depends on.
+
+      
-------------------------------------------------------------------------------
+      This product contains the extensions to Java Collections Framework which 
has
+      been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+        * LICENSE:
+          * license/LICENSE.jsr166y.txt (Public Domain)
+        * HOMEPAGE:
+          * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+          * 
http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+      This product contains a modified version of Robert Harder's Public Domain
+      Base64 Encoder and Decoder, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.base64.txt (Public Domain)
+        * HOMEPAGE:
+          * http://iharder.sourceforge.net/current/java/base64/
+
+      This product contains a modified portion of 'Webbit', an event based
+      WebSocket and HTTP server, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.webbit.txt (BSD License)
+        * HOMEPAGE:
+          * https://github.com/joewalnes/webbit
+
+      This product contains a modified portion of 'SLF4J', a simple logging
+      facade for Java, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.slf4j.txt (MIT License)
+        * HOMEPAGE:
+          * https://www.slf4j.org/
+
+      This product contains a modified portion of 'Apache Harmony', an open 
source
+      Java SE, which can be obtained at:
+
+        * NOTICE:
+          * license/NOTICE.harmony.txt
+        * LICENSE:
+          * license/LICENSE.harmony.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://archive.apache.org/dist/harmony/
+
+      This product contains a modified portion of 'jbzip2', a Java bzip2 
compression
+      and decompression library written by Matthew J. Francis. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jbzip2.txt (MIT License)
+        * HOMEPAGE:
+          * https://code.google.com/p/jbzip2/
+
+      This product contains a modified portion of 'libdivsufsort', a C API 
library to construct
+      the suffix array and the Burrows-Wheeler transformed string for any 
input string of
+      a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.libdivsufsort.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/y-256/libdivsufsort
+
+      This product contains a modified portion of Nitsan Wakart's 'JCTools', 
Java Concurrency Tools for the JVM,
+       which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jctools.txt (ASL2 License)
+        * HOMEPAGE:
+          * https://github.com/JCTools/JCTools
+
+      This product optionally depends on 'JZlib', a re-implementation of zlib 
in
+      pure Java, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jzlib.txt (BSD style License)
+        * HOMEPAGE:
+          * http://www.jcraft.com/jzlib/
+
+      This product optionally depends on 'Compress-LZF', a Java library for 
encoding and
+      decoding data in LZF format, written by Tatu Saloranta. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/ning/compress
+
+      This product optionally depends on 'lz4', a LZ4 Java compression
+      and decompression library written by Adrien Grand. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.lz4.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jpountz/lz4-java
+
+      This product optionally depends on 'lzma-java', a LZMA Java compression
+      and decompression library, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.lzma-java.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jponge/lzma-java
+
+      This product optionally depends on 'zstd-jni', a zstd-jni Java 
compression
+      and decompression library, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/luben/zstd-jni
+
+      This product contains a modified portion of 'jfastlz', a Java port of 
FastLZ compression
+      and decompression library written by William Kinney. It can be obtained 
at:
+
+        * LICENSE:
+          * license/LICENSE.jfastlz.txt (MIT License)
+        * HOMEPAGE:
+          * https://code.google.com/p/jfastlz/
+
+      This product contains a modified portion of and optionally depends on 
'Protocol Buffers', Google's data
+      interchange format, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.protobuf.txt (New BSD License)
+        * HOMEPAGE:
+          * https://github.com/google/protobuf
+
+      This product optionally depends on 'Bouncy Castle Crypto APIs' to 
generate
+      a temporary self-signed X.509 certificate when the JVM does not provide 
the
+      equivalent functionality.  It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.bouncycastle.txt (MIT License)
+        * HOMEPAGE:
+          * https://www.bouncycastle.org/
+
+      This product optionally depends on 'Snappy', a compression library 
produced
+      by Google Inc, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.snappy.txt (New BSD License)
+        * HOMEPAGE:
+          * https://github.com/google/snappy
+
+      This product optionally depends on 'JBoss Marshalling', an alternative 
Java
+      serialization API, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/jboss-remoting/jboss-marshalling
+
+      This product optionally depends on 'Caliper', Google's micro-
+      benchmarking framework, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.caliper.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/google/caliper
+
+      This product optionally depends on 'Apache Commons Logging', a logging
+      framework, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.commons-logging.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://commons.apache.org/logging/
+
+      This product optionally depends on 'Apache Log4J', a logging framework, 
which
+      can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.log4j.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://logging.apache.org/log4j/
+
+      This product optionally depends on 'Aalto XML', an ultra-high performance
+      non-blocking XML processor, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://wiki.fasterxml.com/AaltoHome
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.hpack.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/twitter/hpack
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained 
at:
+
+        * LICENSE:
+          * license/LICENSE.hyper-hpack.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/python-hyper/hpack/
+
+      This product contains a modified version of 'HPACK', a Java 
implementation of
+      the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be 
obtained at:
+
+        * LICENSE:
+          * license/LICENSE.nghttp2-hpack.txt (MIT License)
+        * HOMEPAGE:
+          * https://github.com/nghttp2/nghttp2/
+
+      This product contains a modified portion of 'Apache Commons Lang', a 
Java library
+      provides utilities for the java.lang API, which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.commons-lang.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://commons.apache.org/proper/commons-lang/
+
+
+      This product contains the Maven wrapper scripts from 'Maven Wrapper', 
that provides an easy way to ensure a user has everything necessary to run the 
Maven build.
+
+        * LICENSE:
+          * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/takari/maven-wrapper
+
+      This product contains the dnsinfo.h header file, that provides a way to 
retrieve the system DNS configuration on MacOS.
+      This private header is also used by Apple's open source
+       mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+       * LICENSE:
+          * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+        * HOMEPAGE:
+          * 
https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+      This product optionally depends on 'Brotli4j', Brotli compression and
+      decompression for Java., which can be obtained at:
+
+        * LICENSE:
+          * license/LICENSE.brotli4j.txt (Apache License 2.0)
+        * HOMEPAGE:
+          * https://github.com/hyperxpro/Brotli4j
+
+************************
+Creative Commons Zero license version 1.0
+************************
+
+The following binary components are provided under the Creative Commons Zero 
license version 1.0.  See project link for details.
+
+    (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 
- http://www.reactive-streams.org/)
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
index 255bd06cbd..5c439e96cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
@@ -147,5 +147,18 @@
                 </dependency>
             </dependencies>
         </profile>
+        <profile>
+            <id>include-kafka-aws</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>software.amazon.msk</groupId>
+                    <artifactId>aws-msk-iam-auth</artifactId>
+                    <version>${aws-msk-iam-auth.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
     </profiles>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index c121355cf3..025a830552 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -320,6 +320,7 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaCl
         descriptors.add(SASL_USERNAME);
         descriptors.add(SASL_PASSWORD);
         descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(AWS_PROFILE_NAME);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(SEPARATE_BY_KEY);
         descriptors.add(AUTO_OFFSET_RESET);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 5976175b85..076cd474bb 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -274,6 +274,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor 
implements KafkaClientCo
         descriptors.add(SASL_USERNAME);
         descriptors.add(SASL_PASSWORD);
         descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(AWS_PROFILE_NAME);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(AUTO_OFFSET_RESET);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index 5a445beca2..8b886584c5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -350,6 +350,7 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaPu
         properties.add(SASL_USERNAME);
         properties.add(SASL_PASSWORD);
         properties.add(TOKEN_AUTHENTICATION);
+        properties.add(AWS_PROFILE_NAME);
         properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
         properties.add(MAX_REQUEST_SIZE);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index 4f94446570..2a08b1efb5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -305,6 +305,7 @@ public class PublishKafka_2_6 extends AbstractProcessor 
implements KafkaPublishC
         properties.add(KERBEROS_KEYTAB);
         properties.add(SASL_USERNAME);
         properties.add(SASL_PASSWORD);
+        properties.add(AWS_PROFILE_NAME);
         properties.add(TOKEN_AUTHENTICATION);
         properties.add(SSL_CONTEXT_SERVICE);
         properties.add(KEY);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 206bd479c9..d958997053 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -58,7 +58,7 @@ public interface KafkaClientComponent {
             .description("SASL mechanism used for authentication. Corresponds 
to Kafka Client sasl.mechanism property")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SaslMechanism.class)
+            .allowableValues(SaslMechanism.getAvailableSaslMechanisms())
             .defaultValue(SaslMechanism.GSSAPI.getValue())
             .build();
 
@@ -107,6 +107,19 @@ public interface KafkaClientComponent {
             )
             .build();
 
+    PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
+            .name("aws.profile.name")
+            .displayName("AWS Profile Name")
+            .description("The Amazon Web Services Profile to select when 
multiple profiles are available.")
+            .dependsOn(
+                    SASL_MECHANISM,
+                    SaslMechanism.AWS_MSK_IAM
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("ssl.context.service")
             .displayName("SSL Context Service")
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
new file mode 100644
index 0000000000..a5adfc12d1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.util.StringUtils;
+
+import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
+/**
+ * SASL AWS MSK IAM Login Module implementation of configuration provider
+ */
+public class AwsMskIamLoginConfigProvider implements LoginConfigProvider {
+
+    private static final String MODULE_CLASS = 
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+    private static final String AWS_PROFILE_NAME_KEY = "awsProfileName";
+
+    @Override
+    public String getConfiguration(PropertyContext context) {
+        final String awsProfileName = 
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
+
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS, REQUIRED);
+
+        if (StringUtils.isNotBlank(awsProfileName)) {
+            builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index 2be8274606..307d373e6c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -36,6 +36,7 @@ public class DelegatingLoginConfigProvider implements 
LoginConfigProvider {
         PROVIDERS.put(SaslMechanism.PLAIN, new PlainLoginConfigProvider());
         PROVIDERS.put(SaslMechanism.SCRAM_SHA_256, SCRAM_PROVIDER);
         PROVIDERS.put(SaslMechanism.SCRAM_SHA_512, SCRAM_PROVIDER);
+        PROVIDERS.put(SaslMechanism.AWS_MSK_IAM, new 
AwsMskIamLoginConfigProvider());
     }
 
     /**
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
index 257f98903e..e82207be91 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
@@ -16,38 +16,20 @@
  */
 package org.apache.nifi.kafka.shared.login;
 
-import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE;
-
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
 import org.apache.nifi.security.krb.KerberosUser;
 
 import javax.security.auth.login.AppConfigurationEntry;
-import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Objects;
+
+import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE;
 
 /**
  * Kerberos User Service Login Module implementation of configuration provider
  */
 public class KerberosUserServiceLoginConfigProvider implements 
LoginConfigProvider {
-    private static final String SPACE = " ";
-
-    private static final String EQUALS = "=";
-
-    private static final String DOUBLE_QUOTE = "\"";
-
-    private static final String SEMI_COLON = ";";
-
-    private static final Map<AppConfigurationEntry.LoginModuleControlFlag, 
String> CONTROL_FLAGS = new LinkedHashMap<>();
-
-    static {
-        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, 
"optional");
-        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, 
"required");
-        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, 
"requisite");
-        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT, 
"sufficient");
-    }
 
     /**
      * Get JAAS configuration using configured Kerberos credentials
@@ -61,32 +43,11 @@ public class KerberosUserServiceLoginConfigProvider 
implements LoginConfigProvid
         final KerberosUser kerberosUser = 
kerberosUserService.createKerberosUser();
         final AppConfigurationEntry configurationEntry = 
kerberosUser.getConfigurationEntry();
 
-        final StringBuilder builder = new StringBuilder();
-
-        final String loginModuleName = configurationEntry.getLoginModuleName();
-        builder.append(loginModuleName);
-
-        final AppConfigurationEntry.LoginModuleControlFlag controlFlag = 
configurationEntry.getControlFlag();
-        final String moduleControlFlag = 
Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not 
found");
-        builder.append(SPACE);
-        builder.append(moduleControlFlag);
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(configurationEntry.getLoginModuleName(), 
configurationEntry.getControlFlag());
 
         final Map<String, ?> options = configurationEntry.getOptions();
-        options.forEach((key, value) -> {
-            builder.append(SPACE);
-
-            builder.append(key);
-            builder.append(EQUALS);
-            if (value instanceof String) {
-                builder.append(DOUBLE_QUOTE);
-                builder.append(value);
-                builder.append(DOUBLE_QUOTE);
-            } else {
-                builder.append(value);
-            }
-        });
+        options.forEach(builder::append);
 
-        builder.append(SEMI_COLON);
-        return builder.toString();
+        return builder.build();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
new file mode 100644
index 0000000000..4789f4fdcc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Helper class to build JAAS configuration
+ */
+public class LoginConfigBuilder {
+
+    private static final Map<AppConfigurationEntry.LoginModuleControlFlag, 
String> CONTROL_FLAGS = new LinkedHashMap<>();
+
+    static {
+        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, 
"optional");
+        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, 
"required");
+        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, 
"requisite");
+        
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT, 
"sufficient");
+    }
+
+    private static final String SPACE = " ";
+
+    private static final String EQUALS = "=";
+
+    private static final String DOUBLE_QUOTE = "\"";
+
+    private static final String SEMI_COLON = ";";
+
+    private final StringBuilder builder;
+
+    public LoginConfigBuilder(final String moduleClassName, final 
AppConfigurationEntry.LoginModuleControlFlag controlFlag) {
+        final String moduleControlFlag = 
Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not 
found");
+        this.builder = new 
StringBuilder(moduleClassName).append(SPACE).append(moduleControlFlag);
+    }
+
+    public LoginConfigBuilder append(String key, Object value) {
+        builder.append(SPACE);
+
+        builder.append(key);
+        builder.append(EQUALS);
+        if (value instanceof String) {
+            builder.append(DOUBLE_QUOTE);
+            builder.append(value);
+            builder.append(DOUBLE_QUOTE);
+        } else {
+            builder.append(value);
+        }
+
+        return this;
+    }
+
+    public String build() {
+        builder.append(SEMI_COLON);
+        return builder.toString();
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
index 48290a541b..d4e51e63a2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
@@ -19,17 +19,18 @@ package org.apache.nifi.kafka.shared.login;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
 
+import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
 /**
  * SASL SCRAM Login Module implementation of configuration provider
  */
 public class ScramLoginConfigProvider implements LoginConfigProvider {
     private static final String MODULE_CLASS_NAME = 
"org.apache.kafka.common.security.scram.ScramLoginModule";
 
-    private static final String FORMAT = "%s required username=\"%s\" 
password=\"%s\"";
-
-    private static final String TOKEN_AUTH_ENABLED = "tokenauth=true";
+    private static final String USERNAME_KEY = "username";
+    private static final String PASSWORD_KEY = "password";
 
-    private static final String SEMI_COLON = ";";
+    private static final String TOKEN_AUTH_KEY = "tokenauth";
 
     /**
      * Get JAAS configuration using configured username and password with 
optional token authentication
@@ -39,20 +40,19 @@ public class ScramLoginConfigProvider implements 
LoginConfigProvider {
      */
     @Override
     public String getConfiguration(final PropertyContext context) {
-        final StringBuilder builder = new StringBuilder();
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS_NAME, REQUIRED);
 
         final String username = 
context.getProperty(KafkaClientComponent.SASL_USERNAME).evaluateAttributeExpressions().getValue();
         final String password = 
context.getProperty(KafkaClientComponent.SASL_PASSWORD).evaluateAttributeExpressions().getValue();
 
-        final String moduleUsernamePassword = String.format(FORMAT, 
MODULE_CLASS_NAME, username, password);
-        builder.append(moduleUsernamePassword);
+        builder.append(USERNAME_KEY, username);
+        builder.append(PASSWORD_KEY, password);
 
         final Boolean tokenAuthenticationEnabled = 
context.getProperty(KafkaClientComponent.TOKEN_AUTHENTICATION).asBoolean();
         if (Boolean.TRUE == tokenAuthenticationEnabled) {
-            builder.append(TOKEN_AUTH_ENABLED);
+            builder.append(TOKEN_AUTH_KEY, Boolean.TRUE);
         }
 
-        builder.append(SEMI_COLON);
-        return builder.toString();
+        return builder.build();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
index 9ab885fb0b..c1316145a0 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
@@ -24,6 +24,8 @@ public enum KafkaClientProperty {
 
     SASL_LOGIN_CLASS("sasl.login.class"),
 
+    SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"),
+
     SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
 
     SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index a9da7714e3..04e07a0dab 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.kafka.shared.property;
 
 import org.apache.nifi.components.DescribedValue;
+import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
 
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -31,7 +33,10 @@ public enum SaslMechanism implements DescribedValue {
 
     SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response 
Authentication Mechanism using SHA-512 with username and password"),
 
-    SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response 
Authentication Mechanism using SHA-256 with username and password");
+    SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response 
Authentication Mechanism using SHA-256 with username and password"),
+
+    AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for 
authentication and authorization against Amazon MSK clusters that have AWS IAM 
enabled " +
+            "as an authentication mechanism. The IAM credentials will be found 
using the AWS Default Credentials Provider Chain.");
 
     private final String value;
 
@@ -52,6 +57,14 @@ public enum SaslMechanism implements DescribedValue {
         return foundSaslMechanism.orElseThrow(() -> new 
IllegalArgumentException(String.format("SaslMechanism value [%s] not found", 
value)));
     }
 
+    public static EnumSet<SaslMechanism> getAvailableSaslMechanisms() {
+        if (StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
+            return EnumSet.allOf(SaslMechanism.class);
+        } else {
+            return EnumSet.complementOf(EnumSet.of(SaslMechanism.AWS_MSK_IAM));
+        }
+    }
+
     @Override
     public String getValue() {
         return value;
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index fd06acb6d0..3159442bf6 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SECURITY_PROTOCOL;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SSL_CONTEXT_SERVICE;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_CLIENT_CALLBACK_HANDLER_CLASS;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_JAAS_CONFIG;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CLASS;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
@@ -56,6 +57,8 @@ public class StandardKafkaPropertyProvider implements 
KafkaPropertyProvider {
 
     private static final String SASL_GSSAPI_CUSTOM_LOGIN_CLASS = 
"org.apache.nifi.processors.kafka.pubsub.CustomKerberosLogin";
 
+    public static final String SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS 
= "software.amazon.msk.auth.iam.IAMClientCallbackHandler";
+
     private static final LoginConfigProvider LOGIN_CONFIG_PROVIDER = new 
DelegatingLoginConfigProvider();
 
     private final Set<String> clientPropertyNames;
@@ -86,6 +89,8 @@ public class StandardKafkaPropertyProvider implements 
KafkaPropertyProvider {
             final SaslMechanism saslMechanism = 
SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue());
             if (SaslMechanism.GSSAPI == saslMechanism && 
isCustomKerberosLoginFound()) {
                 properties.put(SASL_LOGIN_CLASS.getProperty(), 
SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+            } else if (SaslMechanism.AWS_MSK_IAM == saslMechanism && 
isAwsMskIamCallbackHandlerFound()) {
+                
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(), 
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
             }
         }
     }
@@ -160,9 +165,17 @@ public class StandardKafkaPropertyProvider implements 
KafkaPropertyProvider {
         }
     }
 
-    private boolean isCustomKerberosLoginFound() {
+    private static boolean isCustomKerberosLoginFound() {
+        return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+    }
+
+    public static boolean isAwsMskIamCallbackHandlerFound() {
+        return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+    }
+
+    private static boolean isClassFound(final String className) {
         try {
-            Class.forName(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+            Class.forName(className);
             return true;
         } catch (final ClassNotFoundException e) {
             return false;
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
index 4927b8c1fe..e36264f2af 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.kerberos.KerberosUserService;
 
@@ -74,6 +75,7 @@ public class KafkaClientCustomValidationFunction implements 
Function<ValidationC
         validateKerberosServices(validationContext, results);
         validateKerberosCredentials(validationContext, results);
         validateUsernamePassword(validationContext, results);
+        validateAwsMskIamMechanism(validationContext, results);
         return results;
     }
 
@@ -233,6 +235,24 @@ public class KafkaClientCustomValidationFunction 
implements Function<ValidationC
         }
     }
 
+    private void validateAwsMskIamMechanism(final ValidationContext 
validationContext, final Collection<ValidationResult> results) {
+        final PropertyValue saslMechanismProperty = 
validationContext.getProperty(SASL_MECHANISM);
+        if (saslMechanismProperty.isSet()) {
+            final SaslMechanism saslMechanism = 
SaslMechanism.getSaslMechanism(saslMechanismProperty.getValue());
+
+            if (SaslMechanism.AWS_MSK_IAM == saslMechanism && 
!StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
+                final String explanation = String.format("[%s] required class 
not found: Kafka modules must be compiled with AWS MSK enabled",
+                        
StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+
+                results.add(new ValidationResult.Builder()
+                        .subject(SASL_MECHANISM.getDisplayName())
+                        .valid(false)
+                        .explanation(explanation)
+                        .build());
+            }
+        }
+    }
+
     private boolean isEmpty(final String string) {
         return string == null || string.isEmpty();
     }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
similarity index 50%
copy from 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
copy to 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
index 9ab885fb0b..f6cfe6bfd2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
@@ -14,37 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.kafka.shared.property;
+package org.apache.nifi.kafka.shared.login;
 
-/**
- * Enumeration of Kafka Client property names without reference to Kafka 
libraries
- */
-public enum KafkaClientProperty {
-    SASL_JAAS_CONFIG("sasl.jaas.config"),
-
-    SASL_LOGIN_CLASS("sasl.login.class"),
-
-    SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
-
-    SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
+import org.junit.jupiter.api.Test;
 
-    SSL_KEYSTORE_TYPE("ssl.keystore.type"),
+import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    SSL_KEY_PASSWORD("ssl.key.password"),
+class LoginConfigBuilderTest {
 
-    SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"),
+    @Test
+    void createExampleJaasConfigTest() {
+        String expectedConfig = "test.class.name required booleanFlag=true 
numberFlag=1 stringFlag=\"string-flag\";";
 
-    SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"),
-
-    SSL_TRUSTSTORE_TYPE("ssl.truststore.type");
-
-    private final String property;
-
-    KafkaClientProperty(final String property) {
-        this.property = property;
-    }
+        LoginConfigBuilder builder = new LoginConfigBuilder("test.class.name", 
REQUIRED);
+        builder.append("booleanFlag", Boolean.TRUE);
+        builder.append("numberFlag", 1);
+        builder.append("stringFlag", "string-flag");
 
-    public String getProperty() {
-        return property;
+        assertEquals(expectedConfig, builder.build());
     }
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index d9080203ed..6761b04748 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -23,9 +23,10 @@
     <packaging>pom</packaging>
 
     <properties>
-      <kafka1.0.version>1.0.2</kafka1.0.version>
-      <kafka2.0.version>2.0.0</kafka2.0.version>
-      <kafka2.6.version>2.6.3</kafka2.6.version>
+        <kafka1.0.version>1.0.2</kafka1.0.version>
+        <kafka2.0.version>2.0.0</kafka2.0.version>
+        <kafka2.6.version>2.6.3</kafka2.6.version>
+        <aws-msk-iam-auth.version>1.1.5</aws-msk-iam-auth.version>
     </properties>
 
     <modules>
@@ -44,7 +45,7 @@
                 <artifactId>nifi-kafka-1-0-processors</artifactId>
                 <version>1.20.0-SNAPSHOT</version>
             </dependency>
-           <dependency>
+            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-kafka-2-0-processors</artifactId>
                 <version>1.20.0-SNAPSHOT</version>

Reply via email to