[ 
https://issues.apache.org/jira/browse/SPARK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Davidson updated SPARK-13065:
------------------------------------
    Description: 
The twitter stream api is very powerful provides a lot of support for 
twitter.com side filtering of status objects. When ever possible we want to let 
twitter do as much work as possible for us.

currently the spark twitter api only allows you to configure a small sub set of 
possible filters 

String{} filters = {"tag1", tag2"}
JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, 
filters);

The current implemenation does 

private[streaming]
class TwitterReceiver(
    twitterAuth: Authorization,
    filters: Seq[String],
    storageLevel: StorageLevel
  ) extends Receiver[Status](storageLevel) with Logging {

. . .


      val query = new FilterQuery
      if (filters.size > 0) {
        query.track(filters.mkString(","))
        newTwitterStream.filter(query)
      } else {
        newTwitterStream.sample()
      }

...

rather than construct the FilterQuery object in TwitterReceiver.onStart(). we 
should be able to pass a FilterQueryObject

looks like an easy fix. See source code links bellow

kind regards

Andy

https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60

https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89


$$$$$$$$$ 2/2/16
attached is my java implementation for this problem. Feel free to reuse it how 
ever you like. In my streaming spark app main() I have the following code

   FilterQuery query = config.getFilterQuery().fetch();
        if (query != null) {
            // TODO https://issues.apache.org/jira/browse/SPARK-13065
            tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, 
query);
        } /*else 
            spark native api
            String[] filters = {"tag1", tag2"}
            tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
            
            see 
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
            
            causes
                 val query = new FilterQuery
          if (filters.size > 0) {
            query.track(filters.mkString(","))
            newTwitterStream.filter(query)
            } */

  was:
The twitter stream api is very powerful provides a lot of support for 
twitter.com side filtering of status objects. When ever possible we want to let 
twitter do as much work as possible for us.

currently the spark twitter api only allows you to configure a small sub set of 
possible filters 

String{} filters = {"tag1", tag2"}
JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, 
filters);

The current implemenation does 

private[streaming]
class TwitterReceiver(
    twitterAuth: Authorization,
    filters: Seq[String],
    storageLevel: StorageLevel
  ) extends Receiver[Status](storageLevel) with Logging {

. . .


      val query = new FilterQuery
      if (filters.size > 0) {
        query.track(filters.mkString(","))
        newTwitterStream.filter(query)
      } else {
        newTwitterStream.sample()
      }

...

rather than construct the FilterQuery object in TwitterReceiver.onStart(). we 
should be able to pass a FilterQueryObject

looks like an easy fix. See source code links bellow

kind regards

Andy

https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60

https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89


> streaming-twitter pass twitter4j.FilterQuery argument to 
> TwitterUtils.createStream()
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-13065
>                 URL: https://issues.apache.org/jira/browse/SPARK-13065
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: all
>            Reporter: Andrew Davidson
>            Priority: Minor
>              Labels: twitter
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> The twitter stream api is very powerful provides a lot of support for 
> twitter.com side filtering of status objects. When ever possible we want to 
> let twitter do as much work as possible for us.
> currently the spark twitter api only allows you to configure a small sub set 
> of possible filters 
> String{} filters = {"tag1", tag2"}
> JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, 
> filters);
> The current implemenation does 
> private[streaming]
> class TwitterReceiver(
>     twitterAuth: Authorization,
>     filters: Seq[String],
>     storageLevel: StorageLevel
>   ) extends Receiver[Status](storageLevel) with Logging {
> . . .
>       val query = new FilterQuery
>       if (filters.size > 0) {
>         query.track(filters.mkString(","))
>         newTwitterStream.filter(query)
>       } else {
>         newTwitterStream.sample()
>       }
> ...
> rather than construct the FilterQuery object in TwitterReceiver.onStart(). we 
> should be able to pass a FilterQueryObject
> looks like an easy fix. See source code links bellow
> kind regards
> Andy
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
> $$$$$$$$$ 2/2/16
> attached is my java implementation for this problem. Feel free to reuse it 
> how ever you like. In my streaming spark app main() I have the following code
>    FilterQuery query = config.getFilterQuery().fetch();
>         if (query != null) {
>             // TODO https://issues.apache.org/jira/browse/SPARK-13065
>             tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, 
> query);
>         } /*else 
>             spark native api
>             String[] filters = {"tag1", tag2"}
>             tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
>             
>             see 
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
>             
>             causes
>                  val query = new FilterQuery
>           if (filters.size > 0) {
>             query.track(filters.mkString(","))
>             newTwitterStream.filter(query)
>             } */



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to