[
https://issues.apache.org/jira/browse/SPARK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Davidson updated SPARK-13065:
------------------------------------
Description:
The twitter stream api is very powerful provides a lot of support for
twitter.com side filtering of status objects. When ever possible we want to let
twitter do as much work as possible for us.
currently the spark twitter api only allows you to configure a small sub set of
possible filters
String{} filters = {"tag1", tag2"}
JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth,
filters);
The current implemenation does
private[streaming]
class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
. . .
val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.mkString(","))
newTwitterStream.filter(query)
} else {
newTwitterStream.sample()
}
...
rather than construct the FilterQuery object in TwitterReceiver.onStart(). we
should be able to pass a FilterQueryObject
looks like an easy fix. See source code links bellow
kind regards
Andy
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
$$$$$$$$$ 2/2/16
attached is my java implementation for this problem. Feel free to reuse it how
ever you like. In my streaming spark app main() I have the following code
FilterQuery query = config.getFilterQuery().fetch();
if (query != null) {
// TODO https://issues.apache.org/jira/browse/SPARK-13065
tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth,
query);
} /*else
spark native api
String[] filters = {"tag1", tag2"}
tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
see
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
causes
val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.mkString(","))
newTwitterStream.filter(query)
} */
was:
The twitter stream api is very powerful provides a lot of support for
twitter.com side filtering of status objects. When ever possible we want to let
twitter do as much work as possible for us.
currently the spark twitter api only allows you to configure a small sub set of
possible filters
String{} filters = {"tag1", tag2"}
JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth,
filters);
The current implemenation does
private[streaming]
class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
. . .
val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.mkString(","))
newTwitterStream.filter(query)
} else {
newTwitterStream.sample()
}
...
rather than construct the FilterQuery object in TwitterReceiver.onStart(). we
should be able to pass a FilterQueryObject
looks like an easy fix. See source code links bellow
kind regards
Andy
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
> streaming-twitter pass twitter4j.FilterQuery argument to
> TwitterUtils.createStream()
> ------------------------------------------------------------------------------------
>
> Key: SPARK-13065
> URL: https://issues.apache.org/jira/browse/SPARK-13065
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.6.0
> Environment: all
> Reporter: Andrew Davidson
> Priority: Minor
> Labels: twitter
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> The twitter stream api is very powerful provides a lot of support for
> twitter.com side filtering of status objects. When ever possible we want to
> let twitter do as much work as possible for us.
> currently the spark twitter api only allows you to configure a small sub set
> of possible filters
> String{} filters = {"tag1", tag2"}
> JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth,
> filters);
> The current implemenation does
> private[streaming]
> class TwitterReceiver(
> twitterAuth: Authorization,
> filters: Seq[String],
> storageLevel: StorageLevel
> ) extends Receiver[Status](storageLevel) with Logging {
> . . .
> val query = new FilterQuery
> if (filters.size > 0) {
> query.track(filters.mkString(","))
> newTwitterStream.filter(query)
> } else {
> newTwitterStream.sample()
> }
> ...
> rather than construct the FilterQuery object in TwitterReceiver.onStart(). we
> should be able to pass a FilterQueryObject
> looks like an easy fix. See source code links bellow
> kind regards
> Andy
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
> $$$$$$$$$ 2/2/16
> attached is my java implementation for this problem. Feel free to reuse it
> how ever you like. In my streaming spark app main() I have the following code
> FilterQuery query = config.getFilterQuery().fetch();
> if (query != null) {
> // TODO https://issues.apache.org/jira/browse/SPARK-13065
> tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth,
> query);
> } /*else
> spark native api
> String[] filters = {"tag1", tag2"}
> tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
>
> see
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
>
> causes
> val query = new FilterQuery
> if (filters.size > 0) {
> query.track(filters.mkString(","))
> newTwitterStream.filter(query)
> } */
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]