Implemented Joe's review feedback: * Renamed processors to ExecuteFlume* * Added a LICENSE and NOTICE file to the nar * Fixed the version number inconsistency in the POMs
Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/286e4738 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/286e4738 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/286e4738 Branch: refs/heads/develop Commit: 286e4738b8bea6e8a386b630822a897ca07410d2 Parents: 16134a2 Author: Joey Echeverria <joe...@gmail.com> Authored: Sun Jul 12 13:33:09 2015 -0700 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Jul 14 14:50:17 2015 -0400 ---------------------------------------------------------------------- .../nifi-flume-bundle/nifi-flume-nar/pom.xml | 95 ++++++ .../src/main/resources/META-INF/LICENSE | 319 +++++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 145 +++++++++ .../nifi-flume-processors/pom.xml | 50 ++- .../flume/AbstractFlumeProcessor.java | 4 +- .../nifi/processors/flume/ExecuteFlumeSink.java | 144 +++++++++ .../processors/flume/ExecuteFlumeSource.java | 194 +++++++++++ .../processors/flume/FlumeSinkProcessor.java | 142 --------- .../processors/flume/FlumeSourceProcessor.java | 196 ------------ .../flume/NifiSessionFactoryChannel.java | 1 - .../processors/flume/util/FlowFileEvent.java | 9 +- .../processors/flume/ExecuteFlumeSinkTest.java | 167 ++++++++++ .../flume/ExecuteFlumeSourceTest.java | 150 +++++++++ .../flume/FlumeSinkProcessorTest.java | 167 ---------- .../flume/FlumeSourceProcessorTest.java | 150 --------- nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml | 2 +- nifi/pom.xml | 10 +- 17 files changed, 1267 insertions(+), 678 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml index c07cedf..a9bbe6f 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -26,6 +26,101 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-processors</artifactId> + <!-- The following are inherited from nifi-hadoop-libraries-nar --> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..c1a3ec4 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,319 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'SUAsync Library' which is +available under a 3-Clause BSD License. + + Copyright (c) 2010 StumbleUpon, Inc. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + - Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + - Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + - Neither the name of the StumbleUpon nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'Asynchronous HBase Client' +which is available under a 3-Clause BSD License. + + Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + - Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + - Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + - Neither the name of the StumbleUpon nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'JOpt Simple' which is +available under the MIT license. + + The MIT License + + Copyright (c) 2004-2011 Paul R. Holser, Jr. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +The binary distribution of this product bundles 'Scala Library' under a BSD +style license. + + Copyright (c) 2002-2015 EPFL + Copyright (c) 2011-2015 Typesafe, Inc. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the EPFL nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS âAS ISâ AND ANY EXPRESS + OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..793746f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,145 @@ +nifi-social-media-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2012 The Apache Software Foundation + + (ASLv2) Apache Commons JEXL + The following NOTICE information applies: + Apache Commons JEXL + Copyright 2001-2011 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Flume + The following NOTICE information applies: + Apache Flume + Copyright 2011-2015 Apache Software Foundation + + (ASLv2) IRClib + The following NOTICE information applies: + IRClib -- A Java Internet Relay Chat library -- + Copyright (C) 2002 - 2006 Christoph Schwering <schwer...@gmail.com> + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Joda-Time + The following NOTICE information applies: + ============================================================================= + = NOTICE file corresponding to section 4d of the Apache License Version 2.0 = + ============================================================================= + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) Apache Kafka + The following NOTICE information applies: + Apache Kafka + Copyright 2012 The Apache Software Foundation. + + (ASLv2) Kite SDK + The following NOTICE information applies: + This product includes software developed by Cloudera, Inc. + (http://www.cloudera.com/). + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + This product includes software developed by + Saxonica (http://www.saxonica.com/). + + (ASLv2) Apache Thrift + The following NOTICE information applies: + Apache Thrift + Copyright 2006-2010 The Apache Software Foundation. + + (ASLv2) Yammer Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2012 Coda Hale and Yammer, Inc. + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released + with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + + (ASLv2) Apache MINA + The following NOTICE information applies: + Apache MINA Core + Copyright 2004-2011 Apache MINA Project + + (ASLv2) The Netty Project + The following NOTICE information applies: + The Netty Project + Copyright 2011 The Netty Project + + (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) + + (ASLv2) Parquet MR + The following NOTICE information applies: + Parquet MR + Copyright 2012 Twitter, Inc. + + This project includes code from https://github.com/lemire/JavaFastPFOR + parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java + Apache License Version 2.0 http://www.apache.org/licenses/. + (c) Daniel Lemire, http://lemire.me/en/ + + (ASLv2) Servlet Specification API (org.mortbay.jetty:servlet-api:2.5-20110124) + + (ASLv2) Twitter4J + The following NOTICE information applies: + Copyright 2007 Yusuke Yamamoto + + Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html + + (ASLv2) Apache Velocity + The following NOTICE information applies: + Apache Velocity + Copyright (C) 2000-2007 The Apache Software Foundation + + (ASLv2) ZkClient + The following NOTICE information applies: + ZkClient + Copyright 2009 Stefan Groschupf \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml index 1dad25f..167aa6e 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -21,6 +21,11 @@ </parent> <artifactId>nifi-flume-processors</artifactId> <packaging>jar</packaging> + + <properties> + <flume.version>1.6.0</flume.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.nifi</groupId> @@ -41,12 +46,12 @@ <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> @@ -56,27 +61,38 @@ </dependency> <!-- Flume Sources --> + <dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> - <artifactId>flume-twitter-source</artifactId> - <version>1.5.2</version> + <artifactId>flume-jms-source</artifactId> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> - <artifactId>flume-jms-source</artifactId> - <version>1.5.2</version> + <artifactId>flume-kafka-source</artifactId> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> <artifactId>flume-scribe-source</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> + <version>${flume.version}</version> </dependency> <!-- Flume Sinks --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-dataset-sink</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-hdfs-sink</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> </dependency> <!-- HDFS sink dependencies --> @@ -99,23 +115,33 @@ <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-hive-sink</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-irc-sink</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-elasticsearch-sink</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-hbase-sink</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-kafka-sink</artifactId> + <version>${flume.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-morphline-solr-sink</artifactId> - <version>1.5.2</version> + <version>${flume.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java index 83ae9e1..9b75047 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java @@ -77,7 +77,7 @@ public abstract class AbstractFlumeProcessor extends AbstractSessionFactoryProce public ValidationResult validate(final String subject, final String value, final ValidationContext context) { String reason = null; try { - FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value); + ExecuteFlumeSource.SOURCE_FACTORY.create("NiFi Source", value); } catch (Exception ex) { reason = ex.getLocalizedMessage(); reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); @@ -97,7 +97,7 @@ public abstract class AbstractFlumeProcessor extends AbstractSessionFactoryProce public ValidationResult validate(final String subject, final String value, final ValidationContext context) { String reason = null; try { - FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value); + ExecuteFlumeSink.SINK_FACTORY.create("NiFi Sink", value); } catch (Exception ex) { reason = ex.getLocalizedMessage(); reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java new file mode 100644 index 0000000..8ccb2d1 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.List; +import java.util.Set; +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink; +import org.apache.flume.conf.Configurables; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * This processor runs a Flume sink + */ +@Tags({"flume", "hadoop", "get", "sink"}) +@CapabilityDescription("Write FlowFile data to a Flume sink") +@TriggerSerially +public class ExecuteFlumeSink extends AbstractFlumeProcessor { + + public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() + .name("Sink Type") + .description("The fully-qualified name of the Sink class") + .required(true) + .addValidator(createSinkValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume sink configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Sink Name") + .description("The name of the sink used in the Flume sink configuration") + .required(true) + .defaultValue("sink-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the sink copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + private volatile Sink sink; + private volatile NifiSinkSessionChannel channel; + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(SUCCESS, FAILURE); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + try { + channel = new NifiSinkSessionChannel(SUCCESS, FAILURE); + channel.start(); + + sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SINK_TYPE).getValue()); + sink.setChannel(channel); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sinkName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(sink, + getFlumeSinkContext(flumeConfig, agentName, sinkName)); + + sink.start(); + } catch (Throwable th) { + getLogger().error("Error creating sink", th); + throw Throwables.propagate(th); + } + } + + @OnStopped + public void stopped() { + sink.stop(); + channel.stop(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + channel.setSession(session); + try { + sink.process(); + } catch (EventDeliveryException ex) { + throw new ProcessException("Flume event delivery failed", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java new file mode 100644 index 0000000..fa02750 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.PollableSource; +import org.apache.flume.Source; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurables; +import org.apache.flume.source.EventDrivenSourceRunner; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * This processor runs a Flume source + */ +@Tags({"flume", "hadoop", "get", "source"}) +@CapabilityDescription("Generate FlowFile data from a Flume source") +@TriggerSerially +public class ExecuteFlumeSource extends AbstractFlumeProcessor { + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("Source Type") + .description("The fully-qualified name of the Source class") + .required(true) + .addValidator(createSourceValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume source configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Source Name") + .description("The name of the source used in the Flume source configuration") + .required(true) + .defaultValue("src-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the source copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success") + .build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + private volatile Source source; + + private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS); + private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null); + private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null); + private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null); + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(SUCCESS); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + try { + source = SOURCE_FACTORY.create( + context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SOURCE_TYPE).getValue()); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sourceName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(source, + getFlumeSourceContext(flumeConfig, agentName, sourceName)); + + if (source instanceof PollableSource) { + source.setChannelProcessor(new ChannelProcessor( + new NifiChannelSelector(pollableSourceChannel))); + source.start(); + } + } catch (Throwable th) { + getLogger().error("Error creating source", th); + throw Throwables.propagate(th); + } + } + + @OnStopped + public void stopped() { + if (source instanceof PollableSource) { + source.stop(); + } else { + EventDrivenSourceRunner runner = runnerRef.get(); + if (runner != null) { + runner.stop(); + runnerRef.compareAndSet(runner, null); + } + + NifiSessionFactoryChannel eventDrivenSourceChannel = eventDrivenSourceChannelRef.get(); + if (eventDrivenSourceChannel != null) { + eventDrivenSourceChannel.stop(); + eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null); + } + } + sessionFactoryRef.set(null); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + if (source instanceof PollableSource) { + super.onTrigger(context, sessionFactory); + } else if (source instanceof EventDrivenSource) { + ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory); + if (old != sessionFactory) { + if (runnerRef.get() != null) { + stopped(); + } + + runnerRef.set(new EventDrivenSourceRunner()); + eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS)); + eventDrivenSourceChannelRef.get().start(); + source.setChannelProcessor(new ChannelProcessor( + new NifiChannelSelector(eventDrivenSourceChannelRef.get()))); + runnerRef.get().setSource(source); + runnerRef.get().start(); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (source instanceof PollableSource) { + PollableSource pollableSource = (PollableSource) source; + try { + pollableSourceChannel.setSession(session); + pollableSource.process(); + } catch (EventDeliveryException ex) { + throw new ProcessException("Error processing pollable source", ex); + } + } else { + throw new ProcessException("Invalid source type: " + source.getClass().getSimpleName()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java deleted file mode 100644 index 2d8506d..0000000 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.flume; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.List; -import java.util.Set; -import org.apache.flume.Context; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.Sink; -import org.apache.flume.conf.Configurables; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.SchedulingContext; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; - -/** - * This processor runs a Flume sink - */ -@Tags({"flume", "hadoop", "get", "sink"}) -@CapabilityDescription("Write FlowFile data to a Flume sink") -public class FlumeSinkProcessor extends AbstractFlumeProcessor { - - public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() - .name("Sink Type") - .description("The fully-qualified name of the Sink class") - .required(true) - .addValidator(createSinkValidator()) - .build(); - public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() - .name("Agent Name") - .description("The name of the agent used in the Flume sink configuration") - .required(true) - .defaultValue("tier1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() - .name("Sink Name") - .description("The name of the sink used in the Flume sink configuration") - .required(true) - .defaultValue("sink-1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() - .name("Flume Configuration") - .description("The Flume configuration for the sink copied from the flume.properties file") - .required(true) - .defaultValue("") - .addValidator(Validator.VALID) - .build(); - - public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); - public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); - - private List<PropertyDescriptor> descriptors; - private Set<Relationship> relationships; - - private volatile Sink sink; - private volatile NifiSinkSessionChannel channel; - - @Override - protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); - this.relationships = ImmutableSet.of(SUCCESS, FAILURE); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @OnScheduled - public void onScheduled(final SchedulingContext context) { - try { - channel = new NifiSinkSessionChannel(SUCCESS, FAILURE); - channel.start(); - - sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), - context.getProperty(SINK_TYPE).getValue()); - sink.setChannel(channel); - - String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); - String agentName = context.getProperty(AGENT_NAME).getValue(); - String sinkName = context.getProperty(SOURCE_NAME).getValue(); - Configurables.configure(sink, - getFlumeSinkContext(flumeConfig, agentName, sinkName)); - - sink.start(); - } catch (Throwable th) { - getLogger().error("Error creating sink", th); - throw Throwables.propagate(th); - } - } - - @OnStopped - public void stopped() { - sink.stop(); - channel.stop(); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - - channel.setSession(session); - try { - sink.process(); - } catch (EventDeliveryException ex) { - throw new ProcessException("Flume event delivery failed", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java deleted file mode 100644 index 55b1f2f..0000000 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.flume; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.PollableSource; -import org.apache.flume.Source; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.conf.Configurables; -import org.apache.flume.source.EventDrivenSourceRunner; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.SchedulingContext; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; - -/** - * This processor runs a Flume source - */ -@Tags({"flume", "hadoop", "get", "source"}) -@CapabilityDescription("Generate FlowFile data from a Flume source") -public class FlumeSourceProcessor extends AbstractFlumeProcessor { - - public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() - .name("Source Type") - .description("The fully-qualified name of the Source class") - .required(true) - .addValidator(createSourceValidator()) - .build(); - public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() - .name("Agent Name") - .description("The name of the agent used in the Flume source configuration") - .required(true) - .defaultValue("tier1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() - .name("Source Name") - .description("The name of the source used in the Flume source configuration") - .required(true) - .defaultValue("src-1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() - .name("Flume Configuration") - .description("The Flume configuration for the source copied from the flume.properties file") - .required(true) - .defaultValue("") - .addValidator(Validator.VALID) - .build(); - - public static final Relationship SUCCESS = new Relationship.Builder().name("success") - .build(); - - private List<PropertyDescriptor> descriptors; - private Set<Relationship> relationships; - - private volatile Source source; - - private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS); - private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null); - private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null); - private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null); - - @Override - protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); - this.relationships = ImmutableSet.of(SUCCESS); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @OnScheduled - public void onScheduled(final SchedulingContext context) { - try { - source = SOURCE_FACTORY.create( - context.getProperty(SOURCE_NAME) - .getValue(), - context.getProperty(SOURCE_TYPE) - .getValue()); - - String flumeConfig = context.getProperty(FLUME_CONFIG) - .getValue(); - String agentName = context.getProperty(AGENT_NAME) - .getValue(); - String sourceName = context.getProperty(SOURCE_NAME) - .getValue(); - Configurables.configure(source, - getFlumeSourceContext(flumeConfig, agentName, sourceName)); - - if (source instanceof PollableSource) { - source.setChannelProcessor(new ChannelProcessor( - new NifiChannelSelector(pollableSourceChannel))); - source.start(); - } - } catch (Throwable th) { - getLogger() - .error("Error creating source", th); - throw Throwables.propagate(th); - } - } - - @OnStopped - public void stopped() { - if (source instanceof PollableSource) { - source.stop(); - } else { - EventDrivenSourceRunner runner = runnerRef.get(); - if (runner != null) { - runner.stop(); - runnerRef.compareAndSet(runner, null); - } - - NifiSessionFactoryChannel eventDrivenSourceChannel = eventDrivenSourceChannelRef.get(); - if (eventDrivenSourceChannel != null) { - eventDrivenSourceChannel.stop(); - eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null); - } - } - sessionFactoryRef.set(null); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { - if (source instanceof PollableSource) { - super.onTrigger(context, sessionFactory); - } else if (source instanceof EventDrivenSource) { - ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory); - if (old != sessionFactory) { - if (runnerRef.get() != null) { - stopped(); - } - - runnerRef.set(new EventDrivenSourceRunner()); - eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS)); - eventDrivenSourceChannelRef.get().start(); - source.setChannelProcessor(new ChannelProcessor( - new NifiChannelSelector(eventDrivenSourceChannelRef.get()))); - runnerRef.get().setSource(source); - runnerRef.get().start(); - } - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (source instanceof PollableSource) { - PollableSource pollableSource = (PollableSource) source; - try { - pollableSourceChannel.setSession(session); - pollableSource.process(); - } catch (EventDeliveryException ex) { - throw new ProcessException("Error processing pollable source", ex); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java index bc56587..eb31a66 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java @@ -39,7 +39,6 @@ public class NifiSessionFactoryChannel extends BasicChannelSemantics { LifecycleState lifecycleState = getLifecycleState(); if (lifecycleState == LifecycleState.STOP) { throw new ChannelFullException("Can't write to a stopped channel"); - //return null; } return new NifiTransaction(sessionFactory.createSession(), relationship); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java index 5dc97d6..fdff203 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java @@ -26,8 +26,13 @@ import org.apache.flume.Event; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ENTRY_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ID_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LAST_QUEUE_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_IDENTIFIERS_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_START_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.SIZE_HEADER; -import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.*; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; @@ -105,7 +110,7 @@ public class FlowFileEvent implements Event { @Override public void process(InputStream in) throws IOException { try (BufferedInputStream input = new BufferedInputStream(in)) { - StreamUtils.copy(in, baos); + StreamUtils.copy(input, baos); } baos.close(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java new file mode 100644 index 0000000..6a0c40d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import java.io.File; +import static org.junit.Assert.assertEquals; + +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.commons.io.filefilter.HiddenFileFilter; +import org.apache.flume.sink.NullSink; +import org.apache.flume.source.AvroSource; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExecuteFlumeSinkTest { + + private static final Logger logger = + LoggerFactory.getLogger(ExecuteFlumeSinkTest.class); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + Collection<ValidationResult> results; + ProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required")); + } + + // non-existent class + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "invalid.class.name"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink")); + } + + // class doesn't implement Sink + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, AvroSource.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink")); + } + + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + + @Test + public void testNullSink() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); + try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(inputStream, attributes); + runner.run(); + } + } + + @Test + public void testBatchSize() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, + "tier1.sinks.sink-1.batchSize = 1000\n"); + for (int i = 0; i < 100000; i++) { + runner.enqueue(String.valueOf(i).getBytes()); + } + runner.run(100); + } + + @Test + public void testHdfsSink() throws IOException { + File destDir = temp.newFolder("hdfs"); + + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "hdfs"); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, + "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" + + "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" + + "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" + + "tier1.sinks.sink-1.serializer.appendNewline = false" + ); + try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(inputStream, attributes); + runner.run(); + } + + File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE); + assertEquals("Unexpected number of destination files.", 1, files.length); + File dst = files[0]; + byte[] expectedMd5; + try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) { + expectedMd5 = FileUtils.computeMd5Digest(md5Stream); + } + byte[] actualMd5 = FileUtils.computeMd5Digest(dst); + Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java new file mode 100644 index 0000000..924776e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.flume.sink.NullSink; +import org.apache.flume.source.AvroSource; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExecuteFlumeSourceTest { + + private static final Logger logger = LoggerFactory.getLogger(ExecuteFlumeSourceTest.class); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); + Collection<ValidationResult> results; + ProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required")); + } + + // non-existent class + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "invalid.class.name"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to load source")); + } + + // class doesn't implement Source + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, NullSink.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.debug(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to create source")); + } + + results = new HashSet<>(); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, AvroSource.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + @Test + public void testSequenceSource() { + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "seq"); + runner.run(); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + logger.debug(flowFile.toString()); + Assert.assertEquals(1, flowFile.getSize()); + } + } + + @Test + public void testSourceWithConfig() throws IOException { + File spoolDirectory = temp.newFolder("spooldir"); + File dst = new File(spoolDirectory, "records.txt"); + FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false); + + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "spooldir"); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, + "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); + runner.run(1, false, true); + // Because the spool directory source is an event driven source, it may take some time for flow files to get + // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than + // that then there is likely a bug. + int numWaits = 10; + while (runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS).size() < 4 && --numWaits > 0) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ex) { + logger.warn("Sleep interrupted"); + } + } + runner.shutdown(); + runner.assertTransferCount(ExecuteFlumeSource.SUCCESS, 4); + int i = 1; + for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS)) { + flowFile.assertContentEquals("record " + i); + i++; + } + } +}