This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 82ab144e6d3 [cleanup][io] Remove Pulsar IO Twitter connector (#25080)
82ab144e6d3 is described below
commit 82ab144e6d38b5544ab814b02abf8c4099b94150
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 16 19:27:23 2025 +0200
[cleanup][io] Remove Pulsar IO Twitter connector (#25080)
---
deployment/terraform-ansible/deploy-pulsar.yaml | 1 -
distribution/io/src/assemble/io.xml | 1 -
pulsar-bom/pom.xml | 5 -
pulsar-functions/worker/pom.xml | 25 ++-
.../rest/api/v3/AbstractFunctionsResourceTest.java | 19 ++-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 2 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 50 +++---
.../java/org/apache/pulsar/io/core/PushSource.java | 2 +-
pulsar-io/docs/pom.xml | 5 -
pulsar-io/pom.xml | 4 +-
pulsar-io/twitter/pom.xml | 91 -----------
.../apache/pulsar/io/twitter/TwitterFireHose.java | 172 ---------------------
.../pulsar/io/twitter/TwitterFireHoseConfig.java | 172 ---------------------
.../apache/pulsar/io/twitter/data/TweetData.java | 130 ----------------
.../pulsar/io/twitter/data/TwitterRecord.java | 65 --------
.../pulsar/io/twitter/data/package-info.java | 19 ---
.../twitter/endpoint/SampleStatusesEndpoint.java | 41 -----
.../pulsar/io/twitter/endpoint/package-info.java | 19 ---
.../org/apache/pulsar/io/twitter/package-info.java | 19 ---
.../resources/META-INF/services/pulsar-io.yaml | 23 ---
.../io/twitter/TwitterFireHoseConfigTest.java | 108 -------------
.../twitter/src/test/resources/sourceConfig.yaml | 23 ---
22 files changed, 62 insertions(+), 934 deletions(-)
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml
b/deployment/terraform-ansible/deploy-pulsar.yaml
index 698a1c01cd8..fede8391994 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -161,7 +161,6 @@
# - rabbitmq
# - redis
# - solr
-# - twitter
- name: Set up broker
template:
src: "../templates/broker.conf"
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index ded0557d5e3..3e4ed50d939 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -45,7 +45,6 @@
-->
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
-
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml
index 1edb4590327..f004d63363d 100644
--- a/pulsar-bom/pom.xml
+++ b/pulsar-bom/pom.xml
@@ -575,11 +575,6 @@
<artifactId>pulsar-io-solr</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io-twitter</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io</artifactId>
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 02171c8276f..b27b6d73574 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -179,7 +179,15 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-twitter</artifactId>
+ <artifactId>pulsar-io-data-generator</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-netty</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
@@ -245,12 +253,21 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-twitter</artifactId>
+ <artifactId>pulsar-io-data-generator</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ <destFileName>pulsar-io-data-generator.nar</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-netty</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
- <destFileName>pulsar-io-twitter.nar</destFileName>
+ <destFileName>pulsar-io-netty.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
@@ -282,10 +299,10 @@
<configuration>
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
+
<pulsar-io-netty.nar.path>${project.build.directory}/pulsar-io-netty.nar</pulsar-io-netty.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
-
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- valid jar file that is not a valid nar file -->
<pulsar-io-invalid.nar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-io-invalid.nar.path>
</systemPropertyVariables>
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
index dd67c39c75f..aafd78cbba7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java
@@ -86,7 +86,8 @@ public abstract class AbstractFunctionsResourceTest {
protected static final String CASSANDRA_STRING_SINK =
"org.apache.pulsar.io.cassandra.CassandraStringSink";
protected static final int PARALLELISM = 1;
private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH =
"pulsar-io-cassandra.nar.path";
- private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH =
"pulsar-io-twitter.nar.path";
+ private static final String SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH =
"pulsar-io-data-generator.nar.path";
+ private static final String SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH =
"pulsar-io-netty.nar.path";
private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH =
"pulsar-io-invalid.nar.path";
private static final String
SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
"pulsar-functions-api-examples.nar.path";
@@ -123,10 +124,16 @@ public abstract class AbstractFunctionsResourceTest {
+ SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + "
system property"));
}
- public static File getPulsarIOTwitterNar() {
- return new
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH)
- , "pulsar-io-twitter.nar file location must be specified with "
- + SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + "
system property"));
+ public static File getPulsarIODataGenNar() {
+ return new
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH)
+ , "pulsar-io-data-generator.nar file location must be
specified with "
+ + SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH + "
system property"));
+ }
+
+ public static File getPulsarIONettyNar() {
+ return new
File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH)
+ , "pulsar-io-netty.nar file location must be specified with "
+ + SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH + " system
property"));
}
public static File getPulsarIOInvalidNar() {
@@ -211,7 +218,7 @@ public abstract class AbstractFunctionsResourceTest {
}
protected File getDefaultNarFile() {
- return getPulsarIOTwitterNar();
+ return getPulsarIODataGenNar();
}
protected void doSetup() throws Exception {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 7c0929a6a9f..2ef2e42d6e7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -226,7 +226,7 @@ public class SinkApiV3ResourceTest extends
AbstractFunctionsResourceTest {
public void testRegisterSinkInvalidJarNoSink() throws IOException {
mockInstanceUtils();
try {
- try (FileInputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (FileInputStream inputStream = new
FileInputStream(getPulsarIONettyNar())) {
testRegisterSinkMissingArguments(
TENANT,
NAMESPACE,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 81170cf569a..f84b3c2e524 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -74,7 +74,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
private static final String source = "test-source";
private static final String outputTopic = "test-output-topic";
private static final String outputSerdeClassName =
TopicSchema.DEFAULT_SERDE;
- private static final String TWITTER_FIRE_HOSE =
"org.apache.pulsar.io.twitter.TwitterFireHose";
+ private static final String DATAGEN_SOURCE =
"org.apache.pulsar.io.datagenerator.DataGeneratorSource";
private SourcesImpl resource;
@Override
@@ -109,7 +109,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null
);
@@ -130,7 +130,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null
);
@@ -151,7 +151,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null
);
@@ -206,7 +206,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Source Package is not provided")
public void testRegisterSourceMissingPackageDetails() throws IOException {
- try (InputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (InputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
testRegisterSourceMissingArguments(
TENANT,
NAMESPACE,
@@ -215,7 +215,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
null,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null
);
@@ -271,7 +271,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
@Test
public void testRegisterSourceNoOutputTopic() throws IOException {
- try (InputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (InputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
testRegisterSourceMissingArguments(
TENANT,
NAMESPACE,
@@ -280,7 +280,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
null,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null
);
@@ -302,7 +302,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
null,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
"http://localhost:1234/test"
);
@@ -386,7 +386,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
}
private void registerDefaultSource() throws IOException {
-
registerDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
+
registerDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
}
private void registerDefaultSourceWithPackageUrl(String packageUrl) throws
IOException {
@@ -482,7 +482,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
when(mockedManager.containsFunction(eq(actualTenant),
eq(actualNamespace), eq(actualName))).thenReturn(false);
SourceConfig sourceConfig = createDefaultSourceConfig();
- try (InputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (InputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
resource.registerSource(
actualTenant,
actualNamespace,
@@ -545,7 +545,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
"Tenant is not provided");
} catch (RestException re) {
@@ -565,7 +565,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
"Namespace is not provided");
} catch (RestException re) {
@@ -585,7 +585,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
"Source name is not provided");
} catch (RestException re) {
@@ -654,7 +654,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
-2,
"Source parallelism must be a positive number");
} catch (RestException re) {
@@ -668,7 +668,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
try {
mockWorkerUtils();
- try (FileInputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (FileInputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
testUpdateSourceMissingArguments(
TENANT,
NAMESPACE,
@@ -677,7 +677,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM + 1,
null);
}
@@ -691,7 +691,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
public void testUpdateSourceChangedTopic() throws Exception {
mockWorkerUtils();
- try (FileInputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (FileInputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
testUpdateSourceMissingArguments(
TENANT,
NAMESPACE,
@@ -700,7 +700,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
"DifferentTopic",
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
PARALLELISM,
null);
}
@@ -756,7 +756,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
- try (InputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (InputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
resource.updateSource(
sourceConfig.getTenant(),
sourceConfig.getNamespace(),
@@ -784,7 +784,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
mockedFormData,
outputTopic,
outputSerdeClassName,
- TWITTER_FIRE_HOSE,
+ DATAGEN_SOURCE,
0,
"Source parallelism must be a positive number");
} catch (RestException re) {
@@ -861,7 +861,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
}
private void updateDefaultSource() throws Exception {
-
updateDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
+
updateDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
}
private void updateDefaultSourceWithPackageUrl(String packageUrl) throws
Exception {
@@ -910,7 +910,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(),
any())).thenReturn(mockedFunctionMetaData);
- try (InputStream inputStream = new
FileInputStream(getPulsarIOTwitterNar())) {
+ try (InputStream inputStream = new
FileInputStream(getPulsarIODataGenNar())) {
resource.updateSource(
TENANT,
NAMESPACE,
@@ -940,7 +940,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
public void testUpdateSourceWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
- String filePackageUrl = getPulsarIOTwitterNar().toURI().toString();
+ String filePackageUrl = getPulsarIODataGenNar().toURI().toString();
SourceConfig sourceConfig = createDefaultSourceConfig();
@@ -1454,7 +1454,7 @@ public class SourceApiV3ResourceTest extends
AbstractFunctionsResourceTest {
sourceConfig.setTenant(TENANT);
sourceConfig.setNamespace(NAMESPACE);
sourceConfig.setName(source);
- sourceConfig.setClassName(TWITTER_FIRE_HOSE);
+ sourceConfig.setClassName(DATAGEN_SOURCE);
sourceConfig.setParallelism(PARALLELISM);
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 6acccdda121..47cc31d5a53 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Record;
/**
* Pulsar's Push Source interface. PushSource read data from
- * external sources (database changes, twitter firehose, etc)
+ * external sources (database changes, etc)
* and publish to a Pulsar topic. The reason its called Push is
* because PushSources get passed a consumer that they
* invoke whenever they have data to be published to Pulsar.
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 2f0e2c002ab..143643843c6 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -207,11 +207,6 @@
<artifactId>pulsar-io-solr</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-twitter</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6e7614e6c19..570b7b6a253 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -49,7 +49,6 @@
<module>common</module>
<module>docs</module>
<module>aws</module>
- <module>twitter</module>
<module>cassandra</module>
<module>aerospike</module>
<module>http</module>
@@ -87,7 +86,6 @@
<module>batch-data-generator</module>
<module>common</module>
<module>aws</module>
- <module>twitter</module>
<module>cassandra</module>
<module>aerospike</module>
<module>http</module>
@@ -139,9 +137,9 @@
<module>batch-discovery-triggerers</module>
<module>batch-data-generator</module>
<module>common</module>
- <module>twitter</module>
<module>cassandra</module>
<module>data-generator</module>
+ <module>netty</module>
</modules>
</profile>
</profiles>
diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
deleted file mode 100644
index 229e551f2e0..00000000000
--- a/pulsar-io/twitter/pom.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io</artifactId>
- <version>4.2.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-io-twitter</artifactId>
- <name>Pulsar IO :: Twitter</name>
-
- <dependencies>
-
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>hbc-core</artifactId>
- <version>${hbc-core.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
-
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-common</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>${skipDeployConnector}</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
deleted file mode 100644
index 30950ffefe2..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.common.DelimitedStreamReader;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pulsar.io.common.IOConfigUtils;
-import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.pulsar.io.twitter.data.TweetData;
-import org.apache.pulsar.io.twitter.data.TwitterRecord;
-import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple Push based Twitter FireHose Source.
- */
-@Connector(
- name = "twitter",
- type = IOType.SOURCE,
- help = "A simple connector moving tweets from Twitter FireHose to Pulsar",
- configClass = TwitterFireHoseConfig.class
-)
-@Slf4j
-public class TwitterFireHose extends PushSource<TweetData> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(TwitterFireHose.class);
-
- // ----- Runtime fields
- private Object waitObject;
-
- private final ObjectMapper mapper = new ObjectMapper().configure(
- DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- @Override
- public void open(Map<String, Object> config, SourceContext sourceContext)
throws IOException {
- TwitterFireHoseConfig hoseConfig =
IOConfigUtils.loadWithSecrets(config,
- TwitterFireHoseConfig.class, sourceContext);
- hoseConfig.validate();
- waitObject = new Object();
- startThread(hoseConfig);
- }
-
- @Override
- public void close() throws Exception {
- stopThread();
- }
-
- private void startThread(TwitterFireHoseConfig config) {
-
- BasicClient client = new ClientBuilder()
- .name(config.getClientName())
- .hosts(config.getClientHosts())
- .endpoint(getEndpoint(config))
- .authentication(getAuthentication(config))
- .processor(new HosebirdMessageProcessor() {
- public DelimitedStreamReader reader;
-
- @Override
- public void setup(InputStream input) {
- reader = new DelimitedStreamReader(input,
Constants.DEFAULT_CHARSET,
- config.getClientBufferSize());
- }
-
- @Override
- public boolean process() throws IOException,
InterruptedException {
- String tweetStr = reader.readLine();
- try {
- TweetData tweet = mapper.readValue(tweetStr,
TweetData.class);
- // We don't really care if the record succeeds or
not.
- // However might be in the future to count failures
- // TODO:- Figure out the metrics story for
connectors
- consume(new TwitterRecord(tweet,
config.getGuestimateTweetTime()));
- } catch (Exception e) {
- LOG.error("Exception thrown", e);
- }
- return true;
- }
- })
- .build();
-
- Thread runnerThread = new Thread(() -> {
- LOG.info("Started the Twitter FireHose Runner Thread");
- client.connect();
- LOG.info("Twitter Streaming API connection established
successfully");
-
- // just wait now
- try {
- synchronized (waitObject) {
- waitObject.wait();
- }
- } catch (Exception e) {
- LOG.info("Got a exception in waitObject");
- }
- LOG.debug("Closing Twitter Streaming API connection");
- client.stop();
- LOG.info("Twitter Streaming API connection closed");
- LOG.info("Twitter FireHose Runner Thread ending");
- });
- runnerThread.setName("TwitterFireHoseRunner");
- runnerThread.start();
- }
-
- private void stopThread() {
- LOG.info("Source closed");
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
-
- private Authentication getAuthentication(TwitterFireHoseConfig config) {
- return new OAuth1(config.getConsumerKey(),
- config.getConsumerSecret(),
- config.getToken(),
- config.getTokenSecret());
- }
-
- private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) {
- List<Long> followings = config.getFollowings();
- List<String> terms = config.getTrackTerms();
-
- if (CollectionUtils.isEmpty(followings) &&
CollectionUtils.isEmpty(terms)) {
- return new SampleStatusesEndpoint().createEndpoint();
- } else {
- StatusesFilterEndpoint hosebirdEndpoint = new
StatusesFilterEndpoint();
-
- if (CollectionUtils.isNotEmpty(followings)) {
- hosebirdEndpoint.followings(followings);
- }
-
- if (CollectionUtils.isNotEmpty(terms)) {
- hosebirdEndpoint.trackTerms(terms);
- }
-
- return hosebirdEndpoint;
- }
- }
-}
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
deleted file mode 100644
index bb02cdde38b..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.collect.Lists;
-import com.twitter.hbc.core.Constants;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-/**
- * Configuration object for the Twitter Firehose Connector.
- */
-@Data
-@Accessors(chain = true)
-public class TwitterFireHoseConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- sensitive = true,
- help = "Your twitter app consumer key. See "
- +
"https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
- )
- private String consumerKey;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- sensitive = true,
- help = "Your twitter app consumer secret. "
- + "See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
- )
- private String consumerSecret;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- sensitive = true,
- help = "Your twitter app token. "
- + "See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
- )
- private String token;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- sensitive = true,
- help = "Your twitter app token secret. "
- + "See
https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens
for details"
- )
- private String tokenSecret;
-
- // Most firehose events have null createdAt time. If this parameter is set
to true
- // then we estimate the createdTime of each firehose event to be current
time.
- @FieldDoc(
- required = false,
- defaultValue = "false",
- help = "Most firehose events have null createdAt time.If this
parameter is set to true, "
- + "the connector estimates the createdTime of each firehose
event to be current time."
- )
- private Boolean guestimateTweetTime = false;
-
- // ------ Optional property keys
-
- @FieldDoc(
- required = false,
- defaultValue = "pulsario-twitter-source",
- help = "The Twitter Firehose Client name"
- )
- private String clientName = "pulsario-twitter-source";
-
- @FieldDoc(
- required = false,
- defaultValue = Constants.STREAM_HOST,
- help = "The Twitter Firehose stream hosts that the connector connects
to"
- )
- private String clientHosts = Constants.STREAM_HOST;
-
- @FieldDoc(
- required = false,
- defaultValue = "50000",
- help = "The Twitter Firehose client buffer size"
- )
- private int clientBufferSize = 50000;
-
- @FieldDoc(
- required = false,
- defaultValue = "",
- help = "A comma separated list of user IDs, indicating the users to
return statuses for in the stream."
- )
- private String followings;
-
- @FieldDoc(
- required = false,
- defaultValue = "",
- help = "Keywords to track. Phrases of keywords are specified by a
comma-separated list."
- )
- private String terms;
-
- public static TwitterFireHoseConfig load(String yamlFile) throws
IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile),
TwitterFireHoseConfig.class);
- }
-
- public static TwitterFireHoseConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
TwitterFireHoseConfig.class);
- }
-
- public void validate() throws IllegalArgumentException {
- if (getConsumerKey() == null || getConsumerSecret() == null
- || getToken() == null || getTokenSecret() == null) {
- throw new IllegalArgumentException("Required property not set.");
- }
- }
-
- public List<Long> getFollowings() {
- if (StringUtils.isBlank(followings)) {
- return Collections.emptyList();
- }
-
- List<Long> result = new ArrayList<Long> ();
-
- for (String s: StringUtils.split(followings, ",")) {
- try {
- result.add(Long.parseLong(StringUtils.trim(s)));
- } catch (NumberFormatException nfEx) {
- // Ignore these
- }
- }
-
- return CollectionUtils.isEmpty(result) ? Collections.emptyList() :
result;
- }
-
- public List<String> getTrackTerms() {
- if (StringUtils.isBlank(terms)) {
- return Collections.emptyList();
- }
-
- return Lists.newArrayList(StringUtils.split(terms, ","));
- }
-}
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
deleted file mode 100644
index 0b9a3199fa7..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
-
-import lombok.Data;
-
-/**
- * POJO for Tweet object.
- */
-@Data
-public class TweetData {
- private String createdAt;
- private Long id;
- private String idStr;
- private String text;
- private String source;
- private Boolean truncated;
- private User user;
- private RetweetedStatus retweetedStatus;
- private Boolean isQuoteStatus;
- private Long quoteCount;
- private Long replyCount;
- private Long retweetCount;
- private Long favoriteCount;
- private Boolean favorited;
- private Boolean retweeted;
- private String filterLevel;
- private String lang;
- private String timestampMs;
- private Delete delete;
-
- /**
- * POJO for Twitter User object.
- */
- @Data
- public static class User {
- private Long id;
- private String idStr;
- private String name;
- private String screenName;
- private String location;
- private String description;
- private String translatorType;
- private Boolean protectedUser;
- private Boolean verified;
- private Long followersCount;
- private Long friendsCount;
- private Long listedCount;
- private Long favouritesCount;
- private Long statusesCount;
- private String createdAt;
- private Boolean geoEnabled;
- private String lang;
- private Boolean contributorsEnabled;
- private Boolean isTranslator;
- private String profileBackgroundColor;
- private String profileBackgroundImageUrl;
- private String profileBackgroundImageUrlHttps;
- private Boolean profileBackgroundTile;
- private String profileLinkColor;
- private String profileSidebarBorderColor;
- private String profileSidebarFillColor;
- private String profileTextColor;
- private Boolean profileUseBackgroundImage;
- private String profileImageUrl;
- private String profileImageUrlHttps;
- private String profileBannerUrl;
- private Boolean defaultProfile;
- private Boolean defaultProfileImage;
- }
-
- /**
- * POJO for Re-Tweet object.
- */
- @Data
- public static class RetweetedStatus {
- private String createdAt;
- private Long id;
- private String idStr;
- private String text;
- private String source;
- private Boolean truncated;
- private User user;
- private Boolean isQuoteStatus;
- private Long quoteCount;
- private Long replyCount;
- private Long retweetCount;
- private Long favoriteCount;
- private Boolean favorited;
- private Boolean retweeted;
- private String filterLevel;
- private String lang;
- }
-
- /**
- * POJO for Tweet Status object.
- */
- @Data
- public static class Status {
- private Long id;
- private String idStr;
- private Long userId;
- private String userIdStr;
- }
-
- /**
- * POJO for Tweet Delete object.
- */
- @Data
- public static class Delete {
- private Status status;
- private String timestampMs;
- }
-}
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
deleted file mode 100644
index 7292c05548b..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Optional;
-import org.apache.pulsar.functions.api.Record;
-
-/**
- * Twitter Record object.
- */
-public class TwitterRecord implements Record<TweetData> {
- private final TweetData tweet;
- private static final SimpleDateFormat dateFormat = new
SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
- private final boolean guestimateTweetTime;
-
- public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
- this.tweet = tweet;
- this.guestimateTweetTime = guestimateTweetTime;
- }
-
- @Override
- public Optional<String> getKey() {
- // TODO: Could use user or tweet ID as key here
- return Optional.empty();
- }
-
- @Override
- public Optional<Long> getEventTime() {
- try {
- if (tweet.getCreatedAt() != null) {
- Date d = dateFormat.parse(tweet.getCreatedAt());
- return Optional.of(d.toInstant().toEpochMilli());
- } else if (guestimateTweetTime) {
- return Optional.of(System.currentTimeMillis());
- } else {
- return Optional.empty();
- }
- } catch (Exception e) {
- return Optional.empty();
- }
- }
-
- @Override
- public TweetData getValue() {
- return tweet;
- }
-}
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
deleted file mode 100644
index eadf21b6a89..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.data;
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
deleted file mode 100644
index ca942076a16..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.endpoint;
-
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import java.io.Serializable;
-
-/**
- * Required for Twitter Client.
- */
-public class SampleStatusesEndpoint implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public StreamingEndpoint createEndpoint() {
- // Returns the sample endpoint: Returning a sample from the firehose
(all tweets)
- StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
- endpoint.stallWarnings(false);
- endpoint.delimited(false);
- return endpoint;
- }
-}
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
deleted file mode 100644
index 2bca50c06c1..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter.endpoint;
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
deleted file mode 100644
index 23f810a0240..00000000000
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 39d7d974a4e..00000000000
--- a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: twitter
-description: Ingest data from Twitter firehose
-sourceClass: org.apache.pulsar.io.twitter.TwitterFireHose
-sourceConfigClass: org.apache.pulsar.io.twitter.TwitterFireHoseConfig
diff --git
a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
deleted file mode 100644
index 6f5f99ce260..00000000000
---
a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.twitter;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.testng.annotations.Test;
-
-public class TwitterFireHoseConfigTest {
-
- private TwitterFireHoseConfig config;
-
- @Test
- public final void loadFromYamlFileTest() throws IOException {
- File yamlFile = getFile("sourceConfig.yaml");
- config = TwitterFireHoseConfig.load(yamlFile.getAbsolutePath());
- assertNotNull(config);
- }
-
- @Test
- public final void loadFromMapTest() throws IOException {
- Map<String, Object> map = new HashMap<> ();
- map.put("consumerKey", "xxx");
- map.put("consumerSecret", "xxx");
- map.put("token", "xxx");
- map.put("tokenSecret", "xxx");
-
- config = TwitterFireHoseConfig.load(map);
-
- assertNotNull(config);
- }
-
- @Test
- public final void validValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<> ();
- map.put("consumerKey", "xxx");
- map.put("consumerSecret", "xxx");
- map.put("token", "xxx");
- map.put("tokenSecret", "xxx");
-
- config = TwitterFireHoseConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required property not set.")
- public final void missingConsumerKeyValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<> ();
-
- config = TwitterFireHoseConfig.load(map);
- config.validate();
- }
-
- @Test
- public final void getFollowingsTest() throws IOException {
- Map<String, Object> map = new HashMap<> ();
- map.put("followings", "123, 456, 789");
- config = TwitterFireHoseConfig.load(map);
-
- List<Long> followings = config.getFollowings();
- assertNotNull(followings);
- assertEquals(followings.size(), 3);
- assertTrue(followings.contains(123L));
- assertTrue(followings.contains(456L));
- assertTrue(followings.contains(789L));
- }
-
-
- @Test
- public final void getTermsTest() throws IOException {
- Map<String, Object> map = new HashMap<> ();
- map.put("terms", "mickey, donald, goofy");
- config = TwitterFireHoseConfig.load(map);
-
- List<String> terms = config.getTrackTerms();
- assertNotNull(terms);
- assertEquals(terms.size(), 3);
- assertTrue(terms.contains("mickey"));
- }
-
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
- }
-
-}
diff --git a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
deleted file mode 100644
index 9ac5708e37f..00000000000
--- a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"consumerKey": "",
-"consumerSecret": ""
-}
\ No newline at end of file