[ 
https://issues.apache.org/jira/browse/SPARK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Davidson updated SPARK-13065:
------------------------------------
    Attachment: twitterFilterQueryPatch.tar.gz

sorry bad name. its not in patch format

> streaming-twitter pass twitter4j.FilterQuery argument to 
> TwitterUtils.createStream()
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-13065
>                 URL: https://issues.apache.org/jira/browse/SPARK-13065
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: all
>            Reporter: Andrew Davidson
>            Priority: Minor
>              Labels: twitter
>         Attachments: twitterFilterQueryPatch.tar.gz
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> The twitter stream api is very powerful provides a lot of support for 
> twitter.com side filtering of status objects. When ever possible we want to 
> let twitter do as much work as possible for us.
> currently the spark twitter api only allows you to configure a small sub set 
> of possible filters 
> String{} filters = {"tag1", tag2"}
> JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, 
> filters);
> The current implemenation does 
> private[streaming]
> class TwitterReceiver(
>     twitterAuth: Authorization,
>     filters: Seq[String],
>     storageLevel: StorageLevel
>   ) extends Receiver[Status](storageLevel) with Logging {
> . . .
>       val query = new FilterQuery
>       if (filters.size > 0) {
>         query.track(filters.mkString(","))
>         newTwitterStream.filter(query)
>       } else {
>         newTwitterStream.sample()
>       }
> ...
> rather than construct the FilterQuery object in TwitterReceiver.onStart(). we 
> should be able to pass a FilterQueryObject
> looks like an easy fix. See source code links bellow
> kind regards
> Andy
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
> $$$$$$$$$ 2/2/16
> attached is my java implementation for this problem. Feel free to reuse it 
> how ever you like. In my streaming spark app main() I have the following code
>    FilterQuery query = config.getFilterQuery().fetch();
>         if (query != null) {
>             // TODO https://issues.apache.org/jira/browse/SPARK-13065
>             tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, 
> query);
>         } /*else 
>             spark native api
>             String[] filters = {"tag1", tag2"}
>             tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
>             
>             see 
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
>             
>             causes
>                  val query = new FilterQuery
>           if (filters.size > 0) {
>             query.track(filters.mkString(","))
>             newTwitterStream.filter(query)
>             } */



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to