This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new eb5d172693 NIFI-10991 Add AWS MSK IAM support to Kafka processors eb5d172693 is described below commit eb5d172693fcaa24804d9e5d7ab3fd77193efc14 Author: Nandor Soma Abonyi <abonyis...@gmail.com> AuthorDate: Wed Dec 21 13:39:52 2022 +0100 NIFI-10991 Add AWS MSK IAM support to Kafka processors This closes #6846. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- nifi-assembly/NOTICE | 44 +++ .../src/main/resources/META-INF/NOTICE | 328 ++++++++++++++++++++- .../nifi-kafka-2-0-processors/pom.xml | 13 + .../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 1 + .../processors/kafka/pubsub/ConsumeKafka_2_0.java | 1 + .../kafka/pubsub/PublishKafkaRecord_2_0.java | 1 + .../processors/kafka/pubsub/PublishKafka_2_0.java | 1 + .../src/main/resources/META-INF/NOTICE | 328 ++++++++++++++++++++- .../nifi-kafka-2-6-processors/pom.xml | 13 + .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 1 + .../processors/kafka/pubsub/ConsumeKafka_2_6.java | 1 + .../kafka/pubsub/PublishKafkaRecord_2_6.java | 1 + .../processors/kafka/pubsub/PublishKafka_2_6.java | 1 + .../shared/component/KafkaClientComponent.java | 15 +- .../shared/login/AwsMskIamLoginConfigProvider.java | 46 +++ .../login/DelegatingLoginConfigProvider.java | 1 + .../KerberosUserServiceLoginConfigProvider.java | 49 +-- .../kafka/shared/login/LoginConfigBuilder.java | 74 +++++ .../shared/login/ScramLoginConfigProvider.java | 20 +- .../kafka/shared/property/KafkaClientProperty.java | 2 + .../nifi/kafka/shared/property/SaslMechanism.java | 15 +- .../provider/StandardKafkaPropertyProvider.java | 17 +- .../KafkaClientCustomValidationFunction.java | 20 ++ .../shared/login/LoginConfigBuilderTest.java} | 41 +-- nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 9 +- 25 files changed, 950 insertions(+), 93 deletions(-) diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index f3529d5320..2da146f69d 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -199,6 +199,18 @@ The following binary components are provided under the Apache Software License v This project contains annotations derived from JCIP-ANNOTATIONS Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + Apache HttpComponents Client + Copyright 1999-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + Apache HttpComponents Core + Copyright 2005-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + (ASLv2) Apache Jakarta HttpClient The following NOTICE information applies: Apache Jakarta HttpClient @@ -1134,6 +1146,23 @@ The following binary components are provided under the Apache Software License v Since product implements StAX API, it has dependencies to StAX API classes. + (ASLv2) AWS SDK for Java + The following NOTICE information applies: + AWS SDK for Java + Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + This product includes software developed by + Amazon Technologies, Inc (http://www.amazon.com/). + + ********************** + THIRD PARTY COMPONENTS + ********************** + This software includes third party software subject to the following copyrights: + - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. + - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. + + The licenses for these third party components are included in LICENSE.txt + (ASLv2) AWS SDK for Java 2.0 The following NOTICE information applies: Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. @@ -1155,6 +1184,16 @@ The following binary components are provided under the Apache Software License v - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. + (ASLv2) AWS EventStream for Java + The following NOTICE information applies: + AWS EventStream for Java + Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) Amazon Ion Java + The following NOTICE information applies: + Amazon Ion Java + Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + (ASLv2) Apache Commons DBCP The following NOTICE information applies: Apache Commons DBCP @@ -2392,6 +2431,11 @@ The following binary components are provided under the Apache Software License v The Box SDK for Java Copyright 2019 Box, Inc. All rights reserved. + (ASLv2) aws-msk-iam-auth + The following NOTICE information applies: + aws-msk-iam-auth + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE index fc058deadf..5c63df88cb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE @@ -1,5 +1,5 @@ -nifi-kafka-1-0-nar -Copyright 2014-2022 The Apache Software Foundation +nifi-kafka-2-0-nar +Copyright 2014-2023 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). @@ -81,3 +81,327 @@ The following binary components are provided under the Apache Software License v in some artifacts (usually source distributions); but is always available from the source code management (SCM) system project uses. + (ASLv2) aws-msk-iam-auth + The following NOTICE information applies: + aws-msk-iam-auth + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) AWS SDK for Java + The following NOTICE information applies: + AWS SDK for Java + Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + This product includes software developed by + Amazon Technologies, Inc (http://www.amazon.com/). + + ********************** + THIRD PARTY COMPONENTS + ********************** + This software includes third party software subject to the following copyrights: + - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. + - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. + + The licenses for these third party components are included in LICENSE.txt + + (ASLv2) AWS EventStream for Java + The following NOTICE information applies: + AWS EventStream for Java + Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + Apache HttpComponents Core + Copyright 2005-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Amazon Ion Java + The following NOTICE information applies: + Amazon Ion Java + Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) Joda-Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) The Netty Project + The following NOTICE information applies: + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * https://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE.<component>.txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * https://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * NOTICE: + * license/NOTICE.harmony.txt + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * https://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product optionally depends on 'zstd-jni', a zstd-jni Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.zstd-jni.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/luben/zstd-jni + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * https://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jboss-remoting/jboss-marshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * https://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at: + + * LICENSE: + * license/LICENSE.hyper-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/python-hyper/hpack/ + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at: + + * LICENSE: + * license/LICENSE.nghttp2-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/nghttp2/nghttp2/ + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + + This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. + + * LICENSE: + * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/takari/maven-wrapper + + This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS. + This private header is also used by Apple's open source + mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/). + + * LICENSE: + * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0) + * HOMEPAGE: + * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h + + This product optionally depends on 'Brotli4j', Brotli compression and + decompression for Java., which can be obtained at: + + * LICENSE: + * license/LICENSE.brotli4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/hyperxpro/Brotli4j + +************************ +Creative Commons Zero license version 1.0 +************************ + +The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details. + + (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml index 35351ad649..63f45a8d42 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml @@ -135,5 +135,18 @@ </dependency> </dependencies> </profile> + <profile> + <id>include-kafka-aws</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>software.amazon.msk</groupId> + <artifactId>aws-msk-iam-auth</artifactId> + <version>${aws-msk-iam-auth.version}</version> + </dependency> + </dependencies> + </profile> </profiles> </project> diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java index e0e0cb5894..c8a9d7d7ec 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java @@ -266,6 +266,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor implements KafkaCl descriptors.add(SASL_USERNAME); descriptors.add(SASL_PASSWORD); descriptors.add(TOKEN_AUTHENTICATION); + descriptors.add(AWS_PROFILE_NAME); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(GROUP_ID); descriptors.add(SEPARATE_BY_KEY); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java index e28852cdc9..94bc7a7c11 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java @@ -249,6 +249,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor implements KafkaClientCo descriptors.add(SASL_USERNAME); descriptors.add(SASL_PASSWORD); descriptors.add(TOKEN_AUTHENTICATION); + descriptors.add(AWS_PROFILE_NAME); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(TOPICS); descriptors.add(TOPIC_TYPE); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java index b4f398464c..340d308c04 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java @@ -307,6 +307,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor implements KafkaPu properties.add(SASL_USERNAME); properties.add(SASL_PASSWORD); properties.add(TOKEN_AUTHENTICATION); + properties.add(AWS_PROFILE_NAME); properties.add(SSL_CONTEXT_SERVICE); properties.add(MESSAGE_KEY_FIELD); properties.add(MAX_REQUEST_SIZE); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java index 9bc312fc9e..2c6bfe8b45 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java @@ -291,6 +291,7 @@ public class PublishKafka_2_0 extends AbstractProcessor implements KafkaPublishC properties.add(SASL_USERNAME); properties.add(SASL_PASSWORD); properties.add(TOKEN_AUTHENTICATION); + properties.add(AWS_PROFILE_NAME); properties.add(SSL_CONTEXT_SERVICE); properties.add(TOPIC); properties.add(DELIVERY_GUARANTEE); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE index 0df5bf90a5..08530c98b9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE @@ -1,5 +1,5 @@ -nifi-kafka-2-5-nar -Copyright 2014-2022 The Apache Software Foundation +nifi-kafka-2-6-nar +Copyright 2014-2023 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). @@ -81,3 +81,327 @@ The following binary components are provided under the Apache Software License v in some artifacts (usually source distributions); but is always available from the source code management (SCM) system project uses. + (ASLv2) aws-msk-iam-auth + The following NOTICE information applies: + aws-msk-iam-auth + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) AWS SDK for Java + The following NOTICE information applies: + AWS SDK for Java + Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + This product includes software developed by + Amazon Technologies, Inc (http://www.amazon.com/). + + ********************** + THIRD PARTY COMPONENTS + ********************** + This software includes third party software subject to the following copyrights: + - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. + - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. + + The licenses for these third party components are included in LICENSE.txt + + (ASLv2) AWS EventStream for Java + The following NOTICE information applies: + AWS EventStream for Java + Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + Apache HttpComponents Core + Copyright 2005-2020 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Amazon Ion Java + The following NOTICE information applies: + Amazon Ion Java + Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + (ASLv2) Joda-Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) The Netty Project + The following NOTICE information applies: + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * https://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE.<component>.txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * https://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * NOTICE: + * license/NOTICE.harmony.txt + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * https://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product optionally depends on 'zstd-jni', a zstd-jni Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.zstd-jni.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/luben/zstd-jni + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * https://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jboss-remoting/jboss-marshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * https://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at: + + * LICENSE: + * license/LICENSE.hyper-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/python-hyper/hpack/ + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at: + + * LICENSE: + * license/LICENSE.nghttp2-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/nghttp2/nghttp2/ + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + + This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. + + * LICENSE: + * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/takari/maven-wrapper + + This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS. + This private header is also used by Apple's open source + mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/). + + * LICENSE: + * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0) + * HOMEPAGE: + * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h + + This product optionally depends on 'Brotli4j', Brotli compression and + decompression for Java., which can be obtained at: + + * LICENSE: + * license/LICENSE.brotli4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/hyperxpro/Brotli4j + +************************ +Creative Commons Zero license version 1.0 +************************ + +The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details. + + (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml index 255bd06cbd..5c439e96cb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml @@ -147,5 +147,18 @@ </dependency> </dependencies> </profile> + <profile> + <id>include-kafka-aws</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>software.amazon.msk</groupId> + <artifactId>aws-msk-iam-auth</artifactId> + <version>${aws-msk-iam-auth.version}</version> + </dependency> + </dependencies> + </profile> </profiles> </project> diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index c121355cf3..025a830552 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -320,6 +320,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl descriptors.add(SASL_USERNAME); descriptors.add(SASL_PASSWORD); descriptors.add(TOKEN_AUTHENTICATION); + descriptors.add(AWS_PROFILE_NAME); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(SEPARATE_BY_KEY); descriptors.add(AUTO_OFFSET_RESET); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 5976175b85..076cd474bb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -274,6 +274,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo descriptors.add(SASL_USERNAME); descriptors.add(SASL_PASSWORD); descriptors.add(TOKEN_AUTHENTICATION); + descriptors.add(AWS_PROFILE_NAME); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(AUTO_OFFSET_RESET); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java index 5a445beca2..8b886584c5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java @@ -350,6 +350,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu properties.add(SASL_USERNAME); properties.add(SASL_PASSWORD); properties.add(TOKEN_AUTHENTICATION); + properties.add(AWS_PROFILE_NAME); properties.add(SSL_CONTEXT_SERVICE); properties.add(MESSAGE_KEY_FIELD); properties.add(MAX_REQUEST_SIZE); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java index 4f94446570..2a08b1efb5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java @@ -305,6 +305,7 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC properties.add(KERBEROS_KEYTAB); properties.add(SASL_USERNAME); properties.add(SASL_PASSWORD); + properties.add(AWS_PROFILE_NAME); properties.add(TOKEN_AUTHENTICATION); properties.add(SSL_CONTEXT_SERVICE); properties.add(KEY); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java index 206bd479c9..d958997053 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java @@ -58,7 +58,7 @@ public interface KafkaClientComponent { .description("SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues(SaslMechanism.class) + .allowableValues(SaslMechanism.getAvailableSaslMechanisms()) .defaultValue(SaslMechanism.GSSAPI.getValue()) .build(); @@ -107,6 +107,19 @@ public interface KafkaClientComponent { ) .build(); + PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() + .name("aws.profile.name") + .displayName("AWS Profile Name") + .description("The Amazon Web Services Profile to select when multiple profiles are available.") + .dependsOn( + SASL_MECHANISM, + SaslMechanism.AWS_MSK_IAM + ) + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") .displayName("SSL Context Service") diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java new file mode 100644 index 0000000000..a5adfc12d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import org.apache.nifi.util.StringUtils; + +import static javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED; + +/** + * SASL AWS MSK IAM Login Module implementation of configuration provider + */ +public class AwsMskIamLoginConfigProvider implements LoginConfigProvider { + + private static final String MODULE_CLASS = "software.amazon.msk.auth.iam.IAMLoginModule"; + + private static final String AWS_PROFILE_NAME_KEY = "awsProfileName"; + + @Override + public String getConfiguration(PropertyContext context) { + final String awsProfileName = context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue(); + + final LoginConfigBuilder builder = new LoginConfigBuilder(MODULE_CLASS, REQUIRED); + + if (StringUtils.isNotBlank(awsProfileName)) { + builder.append(AWS_PROFILE_NAME_KEY, awsProfileName); + } + + return builder.build(); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java index 2be8274606..307d373e6c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java @@ -36,6 +36,7 @@ public class DelegatingLoginConfigProvider implements LoginConfigProvider { PROVIDERS.put(SaslMechanism.PLAIN, new PlainLoginConfigProvider()); PROVIDERS.put(SaslMechanism.SCRAM_SHA_256, SCRAM_PROVIDER); PROVIDERS.put(SaslMechanism.SCRAM_SHA_512, SCRAM_PROVIDER); + PROVIDERS.put(SaslMechanism.AWS_MSK_IAM, new AwsMskIamLoginConfigProvider()); } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java index 257f98903e..e82207be91 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java @@ -16,38 +16,20 @@ */ package org.apache.nifi.kafka.shared.login; -import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE; - import org.apache.nifi.context.PropertyContext; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.kerberos.SelfContainedKerberosUserService; import org.apache.nifi.security.krb.KerberosUser; import javax.security.auth.login.AppConfigurationEntry; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.Objects; + +import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE; /** * Kerberos User Service Login Module implementation of configuration provider */ public class KerberosUserServiceLoginConfigProvider implements LoginConfigProvider { - private static final String SPACE = " "; - - private static final String EQUALS = "="; - - private static final String DOUBLE_QUOTE = "\""; - - private static final String SEMI_COLON = ";"; - - private static final Map<AppConfigurationEntry.LoginModuleControlFlag, String> CONTROL_FLAGS = new LinkedHashMap<>(); - - static { - CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, "optional"); - CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, "required"); - CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, "requisite"); - CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT, "sufficient"); - } /** * Get JAAS configuration using configured Kerberos credentials @@ -61,32 +43,11 @@ public class KerberosUserServiceLoginConfigProvider implements LoginConfigProvid final KerberosUser kerberosUser = kerberosUserService.createKerberosUser(); final AppConfigurationEntry configurationEntry = kerberosUser.getConfigurationEntry(); - final StringBuilder builder = new StringBuilder(); - - final String loginModuleName = configurationEntry.getLoginModuleName(); - builder.append(loginModuleName); - - final AppConfigurationEntry.LoginModuleControlFlag controlFlag = configurationEntry.getControlFlag(); - final String moduleControlFlag = Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not found"); - builder.append(SPACE); - builder.append(moduleControlFlag); + final LoginConfigBuilder builder = new LoginConfigBuilder(configurationEntry.getLoginModuleName(), configurationEntry.getControlFlag()); final Map<String, ?> options = configurationEntry.getOptions(); - options.forEach((key, value) -> { - builder.append(SPACE); - - builder.append(key); - builder.append(EQUALS); - if (value instanceof String) { - builder.append(DOUBLE_QUOTE); - builder.append(value); - builder.append(DOUBLE_QUOTE); - } else { - builder.append(value); - } - }); + options.forEach(builder::append); - builder.append(SEMI_COLON); - return builder.toString(); + return builder.build(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java new file mode 100644 index 0000000000..4789f4fdcc --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.login; + +import javax.security.auth.login.AppConfigurationEntry; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Helper class to build JAAS configuration + */ +public class LoginConfigBuilder { + + private static final Map<AppConfigurationEntry.LoginModuleControlFlag, String> CONTROL_FLAGS = new LinkedHashMap<>(); + + static { + CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, "optional"); + CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, "required"); + CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, "requisite"); + CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT, "sufficient"); + } + + private static final String SPACE = " "; + + private static final String EQUALS = "="; + + private static final String DOUBLE_QUOTE = "\""; + + private static final String SEMI_COLON = ";"; + + private final StringBuilder builder; + + public LoginConfigBuilder(final String moduleClassName, final AppConfigurationEntry.LoginModuleControlFlag controlFlag) { + final String moduleControlFlag = Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not found"); + this.builder = new StringBuilder(moduleClassName).append(SPACE).append(moduleControlFlag); + } + + public LoginConfigBuilder append(String key, Object value) { + builder.append(SPACE); + + builder.append(key); + builder.append(EQUALS); + if (value instanceof String) { + builder.append(DOUBLE_QUOTE); + builder.append(value); + builder.append(DOUBLE_QUOTE); + } else { + builder.append(value); + } + + return this; + } + + public String build() { + builder.append(SEMI_COLON); + return builder.toString(); + } + +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java index 48290a541b..d4e51e63a2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java @@ -19,17 +19,18 @@ package org.apache.nifi.kafka.shared.login; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.kafka.shared.component.KafkaClientComponent; +import static javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED; + /** * SASL SCRAM Login Module implementation of configuration provider */ public class ScramLoginConfigProvider implements LoginConfigProvider { private static final String MODULE_CLASS_NAME = "org.apache.kafka.common.security.scram.ScramLoginModule"; - private static final String FORMAT = "%s required username=\"%s\" password=\"%s\""; - - private static final String TOKEN_AUTH_ENABLED = "tokenauth=true"; + private static final String USERNAME_KEY = "username"; + private static final String PASSWORD_KEY = "password"; - private static final String SEMI_COLON = ";"; + private static final String TOKEN_AUTH_KEY = "tokenauth"; /** * Get JAAS configuration using configured username and password with optional token authentication @@ -39,20 +40,19 @@ public class ScramLoginConfigProvider implements LoginConfigProvider { */ @Override public String getConfiguration(final PropertyContext context) { - final StringBuilder builder = new StringBuilder(); + final LoginConfigBuilder builder = new LoginConfigBuilder(MODULE_CLASS_NAME, REQUIRED); final String username = context.getProperty(KafkaClientComponent.SASL_USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(KafkaClientComponent.SASL_PASSWORD).evaluateAttributeExpressions().getValue(); - final String moduleUsernamePassword = String.format(FORMAT, MODULE_CLASS_NAME, username, password); - builder.append(moduleUsernamePassword); + builder.append(USERNAME_KEY, username); + builder.append(PASSWORD_KEY, password); final Boolean tokenAuthenticationEnabled = context.getProperty(KafkaClientComponent.TOKEN_AUTHENTICATION).asBoolean(); if (Boolean.TRUE == tokenAuthenticationEnabled) { - builder.append(TOKEN_AUTH_ENABLED); + builder.append(TOKEN_AUTH_KEY, Boolean.TRUE); } - builder.append(SEMI_COLON); - return builder.toString(); + return builder.build(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java index 9ab885fb0b..c1316145a0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java @@ -24,6 +24,8 @@ public enum KafkaClientProperty { SASL_LOGIN_CLASS("sasl.login.class"), + SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), + SSL_KEYSTORE_LOCATION("ssl.keystore.location"), SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java index a9da7714e3..04e07a0dab 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java @@ -17,8 +17,10 @@ package org.apache.nifi.kafka.shared.property; import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; import java.util.Arrays; +import java.util.EnumSet; import java.util.Optional; /** @@ -31,7 +33,10 @@ public enum SaslMechanism implements DescribedValue { SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response Authentication Mechanism using SHA-512 with username and password"), - SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response Authentication Mechanism using SHA-256 with username and password"); + SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response Authentication Mechanism using SHA-256 with username and password"), + + AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for authentication and authorization against Amazon MSK clusters that have AWS IAM enabled " + + "as an authentication mechanism. The IAM credentials will be found using the AWS Default Credentials Provider Chain."); private final String value; @@ -52,6 +57,14 @@ public enum SaslMechanism implements DescribedValue { return foundSaslMechanism.orElseThrow(() -> new IllegalArgumentException(String.format("SaslMechanism value [%s] not found", value))); } + public static EnumSet<SaslMechanism> getAvailableSaslMechanisms() { + if (StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) { + return EnumSet.allOf(SaslMechanism.class); + } else { + return EnumSet.complementOf(EnumSet.of(SaslMechanism.AWS_MSK_IAM)); + } + } + @Override public String getValue() { return value; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java index fd06acb6d0..3159442bf6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java @@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider; import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM; import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SECURITY_PROTOCOL; import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_CLIENT_CALLBACK_HANDLER_CLASS; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_JAAS_CONFIG; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CLASS; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION; @@ -56,6 +57,8 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider { private static final String SASL_GSSAPI_CUSTOM_LOGIN_CLASS = "org.apache.nifi.processors.kafka.pubsub.CustomKerberosLogin"; + public static final String SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"; + private static final LoginConfigProvider LOGIN_CONFIG_PROVIDER = new DelegatingLoginConfigProvider(); private final Set<String> clientPropertyNames; @@ -86,6 +89,8 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider { final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue()); if (SaslMechanism.GSSAPI == saslMechanism && isCustomKerberosLoginFound()) { properties.put(SASL_LOGIN_CLASS.getProperty(), SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + } else if (SaslMechanism.AWS_MSK_IAM == saslMechanism && isAwsMskIamCallbackHandlerFound()) { + properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(), SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); } } } @@ -160,9 +165,17 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider { } } - private boolean isCustomKerberosLoginFound() { + private static boolean isCustomKerberosLoginFound() { + return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + } + + public static boolean isAwsMskIamCallbackHandlerFound() { + return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); + } + + private static boolean isClassFound(final String className) { try { - Class.forName(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); + Class.forName(className); return true; } catch (final ClassNotFoundException e) { return false; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java index 4927b8c1fe..e36264f2af 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.kafka.shared.property.KafkaClientProperty; import org.apache.nifi.kafka.shared.property.SaslMechanism; import org.apache.nifi.kafka.shared.property.SecurityProtocol; +import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosUserService; @@ -74,6 +75,7 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC validateKerberosServices(validationContext, results); validateKerberosCredentials(validationContext, results); validateUsernamePassword(validationContext, results); + validateAwsMskIamMechanism(validationContext, results); return results; } @@ -233,6 +235,24 @@ public class KafkaClientCustomValidationFunction implements Function<ValidationC } } + private void validateAwsMskIamMechanism(final ValidationContext validationContext, final Collection<ValidationResult> results) { + final PropertyValue saslMechanismProperty = validationContext.getProperty(SASL_MECHANISM); + if (saslMechanismProperty.isSet()) { + final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(saslMechanismProperty.getValue()); + + if (SaslMechanism.AWS_MSK_IAM == saslMechanism && !StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) { + final String explanation = String.format("[%s] required class not found: Kafka modules must be compiled with AWS MSK enabled", + StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); + + results.add(new ValidationResult.Builder() + .subject(SASL_MECHANISM.getDisplayName()) + .valid(false) + .explanation(explanation) + .build()); + } + } + } + private boolean isEmpty(final String string) { return string == null || string.isEmpty(); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java similarity index 50% copy from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java copy to nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java index 9ab885fb0b..f6cfe6bfd2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java @@ -14,37 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.kafka.shared.property; +package org.apache.nifi.kafka.shared.login; -/** - * Enumeration of Kafka Client property names without reference to Kafka libraries - */ -public enum KafkaClientProperty { - SASL_JAAS_CONFIG("sasl.jaas.config"), - - SASL_LOGIN_CLASS("sasl.login.class"), - - SSL_KEYSTORE_LOCATION("ssl.keystore.location"), - - SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), +import org.junit.jupiter.api.Test; - SSL_KEYSTORE_TYPE("ssl.keystore.type"), +import static javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED; +import static org.junit.jupiter.api.Assertions.assertEquals; - SSL_KEY_PASSWORD("ssl.key.password"), +class LoginConfigBuilderTest { - SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), + @Test + void createExampleJaasConfigTest() { + String expectedConfig = "test.class.name required booleanFlag=true numberFlag=1 stringFlag=\"string-flag\";"; - SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), - - SSL_TRUSTSTORE_TYPE("ssl.truststore.type"); - - private final String property; - - KafkaClientProperty(final String property) { - this.property = property; - } + LoginConfigBuilder builder = new LoginConfigBuilder("test.class.name", REQUIRED); + builder.append("booleanFlag", Boolean.TRUE); + builder.append("numberFlag", 1); + builder.append("stringFlag", "string-flag"); - public String getProperty() { - return property; + assertEquals(expectedConfig, builder.build()); } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml index d9080203ed..6761b04748 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml @@ -23,9 +23,10 @@ <packaging>pom</packaging> <properties> - <kafka1.0.version>1.0.2</kafka1.0.version> - <kafka2.0.version>2.0.0</kafka2.0.version> - <kafka2.6.version>2.6.3</kafka2.6.version> + <kafka1.0.version>1.0.2</kafka1.0.version> + <kafka2.0.version>2.0.0</kafka2.0.version> + <kafka2.6.version>2.6.3</kafka2.6.version> + <aws-msk-iam-auth.version>1.1.5</aws-msk-iam-auth.version> </properties> <modules> @@ -44,7 +45,7 @@ <artifactId>nifi-kafka-1-0-processors</artifactId> <version>1.20.0-SNAPSHOT</version> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-kafka-2-0-processors</artifactId> <version>1.20.0-SNAPSHOT</version>