Hi,
I am following Apache Flink Twitter streaming example given in
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
to get the tweets using hashtag, for which I have created a custom
endpoint. But my custom endpoint is not working (When I use default
endpoint of the TwitterSource then it is working fine).
*My CustomEndpoint is -*
public class CustomEndpoint implements EndpointInitializer, Serializable {
private static final long serialVersionUID = 1L;
@Override
public StreamingEndpoint createEndpoint() {
AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
return endpoint;
}}
*AppleSampleEndpoint.java-*
public class AppleSampleEndpoint extends RawEndpoint {
public static final String PATH =
"https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";
public AppleSampleEndpoint() {
super(PATH, HttpConstants.HTTP_GET);
}}
I want to fetch the tweets using hashtag in Apache Flink, for which I have
created a custom endpoint. But my custom endpoint is not working (When I
use apache flink default endpoint it is working).
*CustomEndpoint-*
public class CustomEndpoint implements EndpointInitializer, Serializable {
private static final long serialVersionUID = 1L;
@Override
public StreamingEndpoint createEndpoint() {
// this default endpoint initializer returns the sample endpoint:
// Returning a sample from the firehose (all tweets)
AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
return endpoint;
}}
*AppleSampleEndpoint.java-*
public class AppleSampleEndpoint extends RawEndpoint {
public static final String PATH =
"https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";
public AppleSampleEndpoint() {
super(PATH, HttpConstants.HTTP_GET);
}}
And this is how I am using CustomEndpoint in my Main class-
DataStream<String> streamSource;
if (params.has(TwitterSource.CONSUMER_KEY) &&
params.has(TwitterSource.CONSUMER_SECRET) &&
params.has(TwitterSource.TOKEN) &&
params.has(TwitterSource.TOKEN_SECRET)
) {
TwitterSource source = new TwitterSource(params.getProperties());
source.setCustomEndpointInitializer(new CustomEndpoint());
streamSource = env.addSource(source);
But I am not able to fetch tweets with hashtag apple here.
Thanks
Devendra Vishwakarma