This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flume-twitter.git
commit 6f08e72901160f568bb6fce6d32ec53f2fb5f8e0 Author: Ralph Goers <[email protected]> AuthorDate: Sat Mar 18 11:03:46 2023 -0700 FLUME-3455 - Move Twitter Source to its own repo --- .asf.yaml | 40 +++ CHANGELOG | 10 + LICENSE.txt | 245 +++++++++++++++ NOTICE.txt | 3 + README.md | 58 ++++ RELEASE-NOTES.txt | 26 ++ checkstyle-header.txt | 16 + findbugs-exclude-filter.xml | 31 ++ flume-twitter-dist/pom.xml | 153 +++++++++ flume-twitter-dist/src/assembly/bin.xml | 50 +++ flume-twitter-dist/src/assembly/src.xml | 45 +++ flume-twitter-source/pom.xml | 74 +++++ .../apache/flume/source/twitter/TwitterSource.java | 346 +++++++++++++++++++++ .../flume/source/twitter/TestTwitterSource.java | 132 ++++++++ flume-twitter-source/src/test/resources/log4j2.xml | 37 +++ .../src/test/resources/twitter-flume.conf | 92 ++++++ pom.xml | 286 +++++++++++++++++ 17 files changed, 1644 insertions(+) diff --git a/.asf.yaml b/.asf.yaml new file mode 100644 index 0000000..c0a61c5 --- /dev/null +++ b/.asf.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# `.asf.yaml` is a branch-specific YAML configuration file for Git repositories to control features such as notifications, GitHub settings, etc. +# See its documentation for details: https://cwiki.apache.org/confluence/display/INFRA/Git+-+.asf.yaml+features + +notifications: + # GitHub already provides notifications for PRs and issues. + # Please don't duplicate that noise here! + commits: [email protected] + jira_options: link label +github: + description: "Apache Flume Twitter provides the Twitter Source for Apache Flume" + homepage: https://logging.apache.org/flume/ + features: + issues: true + del_branch_on_merge: true + autolink_jira: + - FLUME + labels: + - apache + - api + - java + - jvm + - library + - flume + protected_branches: + main: {} diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000..1d1fa92 --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,10 @@ +Release Notes - Flume Twitter - Version 2.0.0 + +** Bug + * + +** Improvement + * [FLUME-3455] - Move the Twitter Source to its own repo. + + + diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..9fa7156 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,245 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +==== + +The following files are included under the 2-Clause BSD License + +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ar.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_bg.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_da.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_de.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_es.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fa.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fi.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_fr.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hi.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_hu.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_it.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_nl.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_no.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_pt.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ro.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_ru.txt +flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/resources/solr/collection1/conf/lang/stopwords_sv.txt + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 0000000..33c6200 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,3 @@ +Apache Flume Spring Boot +Copyright 2022-2023 The Apache Software Foundation + diff --git a/README.md b/README.md new file mode 100644 index 0000000..e692e04 --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Welcome to Apache Flume Twitter! + +Apache Flume is a distributed, reliable, and available service for efficiently +collecting, aggregating, and moving large amounts of event data. It has a simple +and flexible architecture based on streaming data flows. It is robust and fault +tolerant with tunable reliability mechanisms and many failover and recovery +mechanisms. The system is centrally managed and allows for intelligent dynamic +management. It uses a simple extensible data model that allows for online +analytic application. + +The Apache Flume Twitter module provides a source to receive data from Twitter + +Apache Flume Twitter is open-sourced under the Apache Software Foundation License v2.0. + +## Documentation + +Documentation is included in the binary distribution under the docs directory. +In source form, it can be found in the flume-ng-doc directory. + +The Flume 1.x guide and FAQ are available here: + +* https://cwiki.apache.org/FLUME +* https://cwiki.apache.org/confluence/display/FLUME/Getting+Started + +## Contact us! + +* Mailing lists: https://cwiki.apache.org/confluence/display/FLUME/Mailing+Lists +* Slack channel #flume on https://the-asf.slack.com/ + +Bug and Issue tracker. + +* https://github.com/apache/flume-twitter/issues + +## Compiling Flume Twitter + +Compiling Flume Twitter requires the following tools: + +* Oracle Java JDK 11 +* Apache Maven 3.x diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt new file mode 100644 index 0000000..fdf94b3 --- /dev/null +++ b/RELEASE-NOTES.txt @@ -0,0 +1,26 @@ +Apache Flume Twitter 2.0.0 + +CONTENTS +1. What is Apache Flume Twitter +2. Major changes in this Release +3. How to Get Involved +4. How to Report Issues + +1. What is Apache Flume Twitter +Flume is a distributed, reliable, and available service for +efficiently collecting, aggregating, and moving large amounts of event +data. Flume Twitter allows Flume to collect data from Twitter. + +2. Major changes in this Release +For a detailed list of changes, please see the CHANGELOG file included +in this distribution. + +4. How to Get Involved +The Apache Flume project really needs and appreciates any contributions, +including documentation help, source code and feedback. If you are interested +in contributing, please visit: +https://cwiki.apache.org/confluence/display/FLUME/How+to+Contribute + +5. How to Report Issues +The Apache Flume Spring Boot project uses GitHub issues for issue tracking. Please see +https://github.com/apache/flume-twitter/issues diff --git a/checkstyle-header.txt b/checkstyle-header.txt new file mode 100644 index 0000000..4f33236 --- /dev/null +++ b/checkstyle-header.txt @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ diff --git a/findbugs-exclude-filter.xml b/findbugs-exclude-filter.xml new file mode 100644 index 0000000..327be31 --- /dev/null +++ b/findbugs-exclude-filter.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="iso-8859-1"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- ===================================================================== --> +<!-- $Id: findbugs-exclude-filter.xml 773234 2009-05-09 15:27:59Z rgoers $ --> +<!-- ===================================================================== --> +<FindBugsFilter> + <!-- Enable only high priority warnings --> + <Match> + <Priority value="2"/> + </Match> + + <Match> + <Priority value="3"/> + </Match> +</FindBugsFilter> diff --git a/flume-twitter-dist/pom.xml b/flume-twitter-dist/pom.xml new file mode 100644 index 0000000..9816350 --- /dev/null +++ b/flume-twitter-dist/pom.xml @@ -0,0 +1,153 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flume</groupId> + <artifactId>flume-twitter-parent</artifactId> + <version>2.0.0</version> + </parent> + + <artifactId>flume-twitter-dist</artifactId> + <name>Flume Twitter Distribution</name> + <packaging>pom</packaging> + + <properties> + <maven.deploy.skip>true</maven.deploy.skip> + <maven.install.skip>true</maven.install.skip> + <maven.test.skip>true</maven.test.skip> + <spotless.check.skip>true</spotless.check.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-twitter-source</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- calculate checksums of source release for Apache dist area --> + <plugin> + <groupId>net.nicoulaj.maven.plugins</groupId> + <artifactId>checksum-maven-plugin</artifactId> + <version>${checksum-maven-plugin.version}</version> + <executions> + <execution> + <id>calculate-checksums</id> + <goals> + <goal>files</goal> + </goals> + <!-- execute prior to maven-gpg-plugin:sign due to https://github.com/nicoulaj/checksum-maven-plugin/issues/112 --> + <phase>post-integration-test</phase> + <configuration> + <algorithms> + <algorithm>SHA-512</algorithm> + </algorithms> + <!-- https://maven.apache.org/apache-resource-bundles/#source-release-assembly-descriptor --> + <fileSets> + <fileSet> + <directory>${project.build.directory}</directory> + <includes> + <include>apache-flume-twitter-${project.version}-src.zip</include> + <include>apache-flume-twitter-${project.version}-src.tar.gz</include> + <include>apache-flume-twitter-${project.version}-bin.zip</include> + <include>apache-flume-twitter-${project.version}-bin.tar.gz</include> + </includes> + </fileSet> + </fileSets> + <csvSummary>false</csvSummary> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>post-integration-test</phase> + <configuration> + <target> + <property name="spaces" value=" " /> + <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-src.zip.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-src.zip</concat> + <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-src.tar.gz.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-src.tar.gz</concat> + <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-bin.zip.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-bin.zip</concat> + <concat destfile="${project.build.directory}/apache-flume-twitter-${project.version}-bin.tar.gz.sha512" append="yes">${spaces}apache-flume-twitter-${project.version}-bin.tar.gz</concat> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>source-release-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>apache-flume-twitter-${project.version}</finalName> + <descriptors> + <descriptor>src/assembly/src.xml</descriptor> + </descriptors> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + </execution> + <execution> + <id>binary</id> + <configuration> + <finalName>apache-flume-twitter-${project.version}</finalName> + <descriptors> + <descriptor>src/assembly/bin.xml</descriptor> + </descriptors> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <executions> + <execution> + <id>sign-release-artifacts</id> + <goals> + <goal>sign</goal> + </goals> + <configuration> + <keyname>${SigningUserName}</keyname> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flume-twitter-dist/src/assembly/bin.xml b/flume-twitter-dist/src/assembly/bin.xml new file mode 100644 index 0000000..cb67c17 --- /dev/null +++ b/flume-twitter-dist/src/assembly/bin.xml @@ -0,0 +1,50 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<assembly> + <id>bin</id> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + <baseDirectory>apache-flume-twitter-${project.version}-bin</baseDirectory> + <includeSiteDirectory>false</includeSiteDirectory> + <moduleSets> + <moduleSet> + <useAllReactorProjects>true</useAllReactorProjects> + </moduleSet> + </moduleSets> + <dependencySets> + <dependencySet> + <includes> + <include>org.apache.flume:flume-twitter</include> + </includes> + <outputDirectory></outputDirectory> + <unpack>false</unpack> + </dependencySet> + </dependencySets> + + <fileSets> + <fileSet> + <directory>../</directory> + <includes> + <include>LICENSE.txt</include> + <include>NOTICE.txt</include> + <include>RELEASE-NOTES.txt</include> + </includes> + </fileSet> + </fileSets> +</assembly> diff --git a/flume-twitter-dist/src/assembly/src.xml b/flume-twitter-dist/src/assembly/src.xml new file mode 100644 index 0000000..5873726 --- /dev/null +++ b/flume-twitter-dist/src/assembly/src.xml @@ -0,0 +1,45 @@ +<?xml version='1.0' encoding='UTF-8'?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<assembly> + <id>src</id> + <formats> + <format>zip</format> + <format>tar.gz</format> + </formats> + <baseDirectory>apache-flume-twitter-${project.version}-src</baseDirectory> + <fileSets> + <fileSet> + <directory>../</directory> + + <excludes> + <exclude>**/target/**</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> + <exclude>**/.idea/**</exclude> + <exclude>**/*.iml</exclude> + <exclude>**/.settings/**</exclude> + <exclude>lib/**</exclude> + <exclude>**/.DS_Store</exclude> + <exclude>./mvn/wrapper/maven-wrapper.jar</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> diff --git a/flume-twitter-source/pom.xml b/flume-twitter-source/pom.xml new file mode 100644 index 0000000..a99874a --- /dev/null +++ b/flume-twitter-source/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-twitter-parent</artifactId> + <groupId>org.apache.flume</groupId> + <version>2.0.0</version> + </parent> + + <groupId>org.apache.flume</groupId> + <artifactId>flume-twitter-source</artifactId> + <name>Flume Twitter Source</name> + + <properties> + <!-- TODO fix spotbugs/pmd violations --> + <spotbugs.maxAllowedViolations>4</spotbugs.maxAllowedViolations> + <pmd.maxAllowedViolations>2</pmd.maxAllowedViolations> + <module.name>org.apache.flume.source.twitter</module.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-reflect</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-media-support</artifactId> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + </dependency> + </dependencies> + + <build> + </build> +</project> diff --git a/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java new file mode 100644 index 0000000..04849bb --- /dev/null +++ b/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source.twitter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.BatchSizeSupported; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.source.AbstractSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import twitter4j.MediaEntity; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterStream; +import twitter4j.TwitterStreamFactory; +import twitter4j.User; +import twitter4j.auth.AccessToken; + +/** + * Demo Flume source that connects via Streaming API to the 1% sample twitter + * firehose, continuously downloads tweets, converts them to Avro format and + * sends Avro events to a downstream Flume sink. + * + * Requires the consumer and access tokens and secrets of a Twitter developer + * account + */ + [email protected] [email protected] +public class TwitterSource + extends AbstractSource + implements EventDrivenSource, Configurable, StatusListener, BatchSizeSupported { + + private TwitterStream twitterStream; + private Schema avroSchema; + + private long docCount = 0; + private long startTime = 0; + private long exceptionCount = 0; + private long totalTextIndexed = 0; + private long skippedDocs = 0; + private long batchEndTime = 0; + private final List<Record> docs = new ArrayList<Record>(); + private final ByteArrayOutputStream serializationBuffer = + new ByteArrayOutputStream(); + private DataFileWriter<GenericRecord> dataFileWriter; + + private int maxBatchSize = 1000; + private int maxBatchDurationMillis = 1000; + + private SourceCounter sourceCounter; + + // Fri May 14 02:52:55 +0000 2010 + private SimpleDateFormat formatterTo = + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + private DecimalFormat numFormatter = new DecimalFormat("###,###.###"); + + private static int REPORT_INTERVAL = 100; + private static int STATS_INTERVAL = REPORT_INTERVAL * 10; + private static final Logger LOGGER = + LoggerFactory.getLogger(TwitterSource.class); + + public TwitterSource() { + } + + @Override + public void configure(Context context) { + String consumerKey = context.getString("consumerKey"); + String consumerSecret = context.getString("consumerSecret"); + String accessToken = context.getString("accessToken"); + String accessTokenSecret = context.getString("accessTokenSecret"); + + twitterStream = new TwitterStreamFactory().getInstance(); + twitterStream.setOAuthConsumer(consumerKey, consumerSecret); + twitterStream.setOAuthAccessToken(new AccessToken(accessToken, + accessTokenSecret)); + twitterStream.addListener(this); + avroSchema = createAvroSchema(); + dataFileWriter = new DataFileWriter<GenericRecord>( + new GenericDatumWriter<GenericRecord>(avroSchema)); + + maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize); + maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis", + maxBatchDurationMillis); + + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } + } + + @Override + public synchronized void start() { + LOGGER.info("Starting twitter source {} ...", this); + docCount = 0; + startTime = System.currentTimeMillis(); + exceptionCount = 0; + totalTextIndexed = 0; + skippedDocs = 0; + batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + twitterStream.sample(); + LOGGER.info("Twitter source {} started.", getName()); + // This should happen at the end of the start method, since this will + // change the lifecycle status of the component to tell the Flume + // framework that this component has started. Doing this any earlier + // tells the framework that the component started successfully, even + // if the method actually fails later. + super.start(); + } + + @Override + public synchronized void stop() { + LOGGER.info("Twitter source {} stopping...", getName()); + twitterStream.shutdown(); + super.stop(); + LOGGER.info("Twitter source {} stopped.", getName()); + } + + public void onStatus(Status status) { + Record doc = extractRecord("", avroSchema, status); + if (doc == null) { + return; // skip + } + docs.add(doc); + if (docs.size() >= maxBatchSize || + System.currentTimeMillis() >= batchEndTime) { + sourceCounter.addToEventReceivedCount(docs.size()); + batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + byte[] bytes; + try { + bytes = serializeToAvro(avroSchema, docs); + } catch (IOException e) { + sourceCounter.incrementGenericProcessingFail(); + LOGGER.error("Exception while serializing tweet", e); + return; //skip + } + Event event = EventBuilder.withBody(bytes); + getChannelProcessor().processEvent(event); // send event to the flume sink + docs.clear(); + sourceCounter.addToEventAcceptedCount(docs.size()); + } + docCount++; + if ((docCount % REPORT_INTERVAL) == 0) { + LOGGER.info(String.format("Processed %s docs", + numFormatter.format(docCount))); + } + if ((docCount % STATS_INTERVAL) == 0) { + logStats(); + } + } + + private Schema createAvroSchema() { + Schema avroSchema = Schema.createRecord("Doc", "adoc", null, false); + List<Field> fields = new ArrayList<Field>(); + fields.add(new Field("id", Schema.create(Type.STRING), null, null)); + fields.add(new Field("user_friends_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_location", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_description", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_statuses_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_followers_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_name", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_screen_name", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("created_at", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("text", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("retweet_count", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("retweeted", + createOptional(Schema.create(Type.BOOLEAN)), + null, null)); + fields.add(new Field("in_reply_to_user_id", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("source", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("in_reply_to_status_id", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("media_url_https", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("expanded_url", + createOptional(Schema.create(Type.STRING)), + null, null)); + avroSchema.setFields(fields); + return avroSchema; + } + + private Record extractRecord(String idPrefix, Schema avroSchema, Status status) { + User user = status.getUser(); + Record doc = new Record(avroSchema); + + doc.put("id", idPrefix + status.getId()); + doc.put("created_at", formatterTo.format(status.getCreatedAt())); + doc.put("retweet_count", Long.valueOf(status.getRetweetCount())); + doc.put("retweeted", status.isRetweet()); + doc.put("in_reply_to_user_id", status.getInReplyToUserId()); + doc.put("in_reply_to_status_id", status.getInReplyToStatusId()); + + addString(doc, "source", status.getSource()); + addString(doc, "text", status.getText()); + + MediaEntity[] mediaEntities = status.getMediaEntities(); + if (mediaEntities.length > 0) { + addString(doc, "media_url_https", mediaEntities[0].getMediaURLHttps()); + addString(doc, "expanded_url", mediaEntities[0].getExpandedURL()); + } + + doc.put("user_friends_count", user.getFriendsCount()); + doc.put("user_statuses_count", user.getStatusesCount()); + doc.put("user_followers_count", user.getFollowersCount()); + addString(doc, "user_location", user.getLocation()); + addString(doc, "user_description", user.getDescription()); + addString(doc, "user_screen_name", user.getScreenName()); + addString(doc, "user_name", user.getName()); + return doc; + } + + private byte[] serializeToAvro(Schema avroSchema, List<Record> docList) + throws IOException { + serializationBuffer.reset(); + dataFileWriter.create(avroSchema, serializationBuffer); + try { + for (Record doc2 : docList) { + dataFileWriter.append(doc2); + } + } finally { + dataFileWriter.close(); + } + return serializationBuffer.toByteArray(); + } + + private Schema createOptional(Schema schema) { + return Schema.createUnion(Arrays.asList( + new Schema[] { schema, Schema.create(Type.NULL) })); + } + + private void addString(Record doc, String avroField, String val) { + if (val == null) { + return; + } + doc.put(avroField, val); + totalTextIndexed += val.length(); + } + + private void logStats() { + double mbIndexed = totalTextIndexed / (1024 * 1024.0); + long seconds = (System.currentTimeMillis() - startTime) / 1000; + seconds = Math.max(seconds, 1); + LOGGER.info(String.format("Total docs indexed: %s, total skipped docs: %s", + numFormatter.format(docCount), numFormatter.format(skippedDocs))); + LOGGER.info(String.format(" %s docs/second", + numFormatter.format(docCount / seconds))); + LOGGER.info(String.format("Run took %s seconds and processed:", + numFormatter.format(seconds))); + LOGGER.info(String.format(" %s MB/sec sent to index", + numFormatter.format(((float) totalTextIndexed / (1024 * 1024)) / seconds))); + LOGGER.info(String.format(" %s MB text sent to index", + numFormatter.format(mbIndexed))); + LOGGER.info(String.format("There were %s exceptions ignored: ", + numFormatter.format(exceptionCount))); + } + + public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + // Do nothing... + } + + public void onScrubGeo(long userId, long upToStatusId) { + // Do nothing... + } + + public void onStallWarning(StallWarning warning) { + // Do nothing... + } + + public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + // Do nothing... + } + + public void onException(Exception e) { + LOGGER.error("Exception while streaming tweets", e); + } + + @Override + public long getBatchSize() { + return maxBatchSize; + } +} diff --git a/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java b/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java new file mode 100644 index 0000000..034c2e3 --- /dev/null +++ b/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source.twitter; + +import static org.fest.reflect.core.Reflection.field; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Sink; +import org.apache.flume.SinkRunner; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.ChannelCounter; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.sink.DefaultSinkProcessor; +import org.apache.flume.sink.LoggerSink; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestTwitterSource extends Assert { + + @BeforeClass + public static void setUp() { + try { + Assume.assumeNotNull(InetAddress.getByName("stream.twitter.com")); + } catch (UnknownHostException e) { + Assume.assumeTrue(false); // ignore Test if twitter is unreachable + } + } + + @Test + public void testBasic() throws Exception { + String consumerKey = System.getProperty("twitter.consumerKey"); + Assume.assumeNotNull(consumerKey); + + String consumerSecret = System.getProperty("twitter.consumerSecret"); + Assume.assumeNotNull(consumerSecret); + + String accessToken = System.getProperty("twitter.accessToken"); + Assume.assumeNotNull(accessToken); + + String accessTokenSecret = System.getProperty("twitter.accessTokenSecret"); + Assume.assumeNotNull(accessTokenSecret); + + Context context = new Context(); + context.put("consumerKey", consumerKey); + context.put("consumerSecret", consumerSecret); + context.put("accessToken", accessToken); + context.put("accessTokenSecret", accessTokenSecret); + context.put("maxBatchDurationMillis", "1000"); + context.put("maxBatchSize", "1"); + + TwitterSource source = new TwitterSource(); + source.configure(context); + + Map<String, String> channelContext = new HashMap(); + channelContext.put("capacity", "1000000"); + channelContext.put("keep-alive", "0"); // for faster tests + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context(channelContext)); + + Sink sink = new LoggerSink(); + sink.setChannel(channel); + sink.start(); + DefaultSinkProcessor proc = new DefaultSinkProcessor(); + proc.setSinks(Collections.singletonList(sink)); + SinkRunner sinkRunner = new SinkRunner(proc); + sinkRunner.start(); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(Collections.singletonList(channel)); + ChannelProcessor chp = new ChannelProcessor(rcs); + source.setChannelProcessor(chp); + source.start(); + + Thread.sleep(5000); + source.stop(); + sinkRunner.stop(); + sink.stop(); + + long successfulEvents = getTwitterCounterGroup(source).getEventReceivedCount(); + long receivedEvents = getTwitterCounterGroup(source).getEventReceivedCount(); + long channelEvents = getMemoryChannelCounterGroup((MemoryChannel)channel).getEventPutAttemptCount(); + + assertEquals("Received vs. Success:", receivedEvents, successfulEvents); + assertEquals("Success vs. Channel", channelEvents, successfulEvents); + + } + + private SourceCounter getTwitterCounterGroup(TwitterSource source) { + return field("sourceCounter").ofType(SourceCounter.class).in(source).get(); + } + + + private ChannelCounter getMemoryChannelCounterGroup(MemoryChannel source) { + return field("channelCounter").ofType(ChannelCounter.class).in(source).get(); + } + + @Test + public void testCarrotDateFormatBug() throws Exception { + SimpleDateFormat formatterFrom = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy"); + formatterFrom.parse("Fri Oct 26 22:53:55 +0000 2012"); + } + +} diff --git a/flume-twitter-source/src/test/resources/log4j2.xml b/flume-twitter-source/src/test/resources/log4j2.xml new file mode 100644 index 0000000..0f3d384 --- /dev/null +++ b/flume-twitter-source/src/test/resources/log4j2.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<Configuration status="OFF"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%-4r [%t] %-5p %c %x - %m%n" /> + </Console> + </Appenders> + <Loggers> + <Logger name="org.apache.flume.sink" level="INFO"/> + <Logger name="org.apache.solr" level="INFO"/> + <Logger name="org.apache.solr.morphline" level="DEBUG"/> + <Logger name="org.apache.solr" level="DEBUG"/> + <Logger name="org.apache.solr.update.processor.LogUpdateProcessor" level="WARN"/> + <Logger name="org.apache.solr.core.SolrCore" level="WARN"/> + <Logger name="org.apache.solr.search.SolrIndexSearcher" level="ERROR"/> + <Root level="WARN"> + <AppenderRef ref="Console" /> + </Root> + </Loggers> +</Configuration> \ No newline at end of file diff --git a/flume-twitter-source/src/test/resources/twitter-flume.conf b/flume-twitter-source/src/test/resources/twitter-flume.conf new file mode 100644 index 0000000..72fe4ef --- /dev/null +++ b/flume-twitter-source/src/test/resources/twitter-flume.conf @@ -0,0 +1,92 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# The configuration file needs to define the sources, +# the channels and the sinks. +# Sources, channels and sinks are defined per agent, +# in this case called 'agent' + +agent.sources = twitterSrc +#agent.sources = httpSrc +#agent.sources = spoolSrc +#agent.sources = avroSrc +agent.channels = memoryChannel +agent.sinks = solrSink +#agent.sinks = loggerSink + +agent.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource +agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY +agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET +agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN +agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET +agent.sources.twitterSrc.maxBatchDurationMillis = 200 +agent.sources.twitterSrc.channels = memoryChannel + +agent.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource +agent.sources.httpSrc.port = 5140 +agent.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler +agent.sources.httpSrc.handler.maxBlobLength = 2000000000 +#agent.sources.httpSrc.interceptors = uuidinterceptor +#agent.sources.httpSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +#agent.sources.httpSrc.interceptors.uuidinterceptor.headerName = id +##agent.sources.httpSrc.interceptors.uuidinterceptor.preserveExisting = false +##agent.sources.httpSrc.interceptors.uuidinterceptor.prefix = myhostname +agent.sources.httpSrc.channels = memoryChannel + +agent.sources.spoolSrc.type = spooldir +agent.sources.spoolSrc.spoolDir = /tmp/myspooldir +agent.sources.spoolSrc.ignorePattern = \. +agent.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder +agent.sources.spoolSrc.deserializer.maxBlobLength = 2000000000 +agent.sources.spoolSrc.batchSize = 1 +agent.sources.spoolSrc.fileHeader = true +agent.sources.spoolSrc.fileHeaderKey = resourceName +#agent.sources.spoolSrc.interceptors = uuidinterceptor +#agent.sources.spoolSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +#agent.sources.spoolSrc.interceptors.uuidinterceptor.headerName = id +##agent.sources.spoolSrc.interceptors.uuidinterceptor.preserveExisting = false +##agent.sources.spoolSrc.interceptors.uuidinterceptor.prefix = myhostname +agent.sources.spoolSrc.channels = memoryChannel + +agent.sources.avroSrc.type = avro +agent.sources.avroSrc.bind = 127.0.0.1 +agent.sources.avroSrc.port = 10000 +agent.sources.avroSrc.channels = memoryChannel +agent.sources.avroSrc.interceptors = uuidinterceptor +agent.sources.avroSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +agent.sources.avroSrc.interceptors.uuidinterceptor.headerName = id +#agent.sources.avroSrc.interceptors.uuidinterceptor.preserveExisting = false +#agent.sources.avroSrc.interceptors.uuidinterceptor.prefix = myhostname +#agent.sources.avroSrc.interceptors = morphlineinterceptor +#agent.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder +#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf +#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1 + +agent.channels.memoryChannel.type = memory +agent.channels.memoryChannel.capacity = 10000 +agent.channels.memoryChannel.transactionCapacity = 1000 + +#agent.channels.fileChannel.type = file +#agent.channels.fileChannel.capacity = 1000000 +#agent.channels.fileChannel.transactionCapacity = 1000 +#agent.channels.fileChannel.write-timeout = 1 + +agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink +agent.sinks.solrSink.channel = memoryChannel +#agent.sinks.solrSink.batchSize = 1000 +#agent.sinks.solrSink.batchDurationMillis = 1000 +agent.sinks.solrSink.morphlineFile = /etc/flume-ng/conf/morphline.conf +#agent.sinks.solrSink.morphlineId = morphline1 + +#agent.sinks.loggerSink.type = logger +#agent.sinks.loggerSink.channel = memoryChannel diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..49cbbca --- /dev/null +++ b/pom.xml @@ -0,0 +1,286 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>29</version> + </parent> + + <groupId>org.apache.flume</groupId> + <artifactId>flume-twitter-parent</artifactId> + <name>Flume Twitter Parent</name> + <version>2.0.0</version> + <packaging>pom</packaging> + + <properties> + <ReleaseVersion>2.0.0</ReleaseVersion> + <ReleaseManager>Ralph Goers</ReleaseManager> + <ReleaseKey>B3D8E1BA</ReleaseKey> + <SigningUserName>[email protected]</SigningUserName> + <checksum-maven-plugin.version>1.11</checksum-maven-plugin.version> + <fest-reflect.version>1.4</fest-reflect.version> + <findsecbugs-plugin.version>1.12.0</findsecbugs-plugin.version> + <flume.version>1.11.0</flume.version> + <junit.version>4.13.2</junit.version> + <log4j.version>2.20.0</log4j.version> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <module.name>org.apache.flume.twitter</module.name> + <mvn-gpg-plugin.version>1.6</mvn-gpg-plugin.version> + <mvn-javadoc-plugin.version>2.9</mvn-javadoc-plugin.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <rat.version>0.12</rat.version> + <slf4j.version>1.7.32</slf4j.version> + <spotbugs-maven-plugin.version>4.7.2.1</spotbugs-maven-plugin.version> + <spotless-maven-plugin.version>2.27.2</spotless-maven-plugin.version> + <twitter4j.version>4.0.7</twitter4j.version> + <twitter4j-media.version>4.0.6</twitter4j-media.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-twitter-source</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-node</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j.version}</version> + </dependency> + + <!-- Dependencies of the Twitter source --> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>${twitter4j.version}</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-media-support</artifactId> + <version>${twitter4j-media.version}</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>${twitter4j.version}</version> + </dependency> + + <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-reflect</artifactId> + <version>${fest-reflect.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <inceptionYear>2022</inceptionYear> + + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/FLUME</url> + </issueManagement> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <mailingLists> + <mailingList> + <archive>http://mail-archives.apache.org/mod_mbox/flume-user/</archive> + <name>Flume User List</name> + <post>[email protected]</post> + <subscribe>[email protected]</subscribe> + <unsubscribe>[email protected]</unsubscribe> + </mailingList> + <mailingList> + <archive>http://mail-archives.apache.org/mod_mbox/flume-dev/</archive> + <name>Flume Developer List</name> + <post>[email protected]</post> + <subscribe>[email protected]</subscribe> + <unsubscribe>[email protected]</unsubscribe> + </mailingList> + <mailingList> + <archive>http://mail-archives.apache.org/mod_mbox/flume-commits/</archive> + <name>Flume Commits</name> + <post>[email protected]</post> + <subscribe>[email protected]</subscribe> + <unsubscribe>[email protected]</unsubscribe> + </mailingList> + </mailingLists> + + <scm> + <url>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</url> + <developerConnection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</developerConnection> + <connection>https://gitbox.apache.org/repos/asf/flume-spring-boot.git</connection> + </scm> + + <developers> + <developer> + <name>Ralph Goers</name> + <id>rgoers</id> + <email>[email protected]</email> + <organization>Intuit</organization> + </developer> + </developers> + + <organization> + <name>Apache Software Foundation</name> + <url>http://www.apache.org</url> + </organization> + <modules> + <module>flume-twitter-source</module> + </modules> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>${rat.version}</version> + <configuration> + <excludes> + <exclude>**/.idea/</exclude> + <exclude>**/*.iml</exclude> + <exclude>src/main/resources/META-INF/services/**/*</exclude> + <exclude>**/nb-configuration.xml</exclude> + <exclude>.git/</exclude> + <exclude>patchprocess/</exclude> + <exclude>.gitignore</exclude> + <exclude>**/*.yml</exclude> + <exclude>**/*.yaml</exclude> + <exclude>**/*.json</exclude> + <!-- ASF jenkins box puts the Maven repo in our root directory. --> + <exclude>.repository/</exclude> + <exclude>**/*.diff</exclude> + <exclude>**/*.patch</exclude> + <exclude>**/*.avsc</exclude> + <exclude>**/*.avro</exclude> + <exclude>**/docs/**</exclude> + <exclude>**/test/resources/**</exclude> + <exclude>**/.settings/*</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> + <exclude>**/target/**</exclude> + <exclude>**/derby.log</exclude> + <exclude>**/metastore_db/</exclude> + <exclude>.mvn/**</exclude> + <exclude>**/exclude-pmd.properties</exclude> + </excludes> + <consoleOutput>true</consoleOutput> + </configuration> + <executions> + <execution> + <id>verify.rat</id> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>release</id> + <modules> + <module>flume-twitter-dist</module> + </modules> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>${mvn-gpg-plugin.version}</version> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + <configuration> + <keyname>${SigningUserName}</keyname> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>${mvn-javadoc-plugin.version}</version> + <executions> + <execution> + <id>javadoc-jar</id> + <phase>package</phase> + <goals> + <goal>aggregate-jar</goal> + </goals> + </execution> + </executions> + <configuration> + <additionalparam>-Xdoclint:none</additionalparam> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project>
