[ https://issues.apache.org/jira/browse/SPARK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Davidson updated SPARK-13065: ------------------------------------ Attachment: twitterFilterQueryPatch.tar.gz sorry bad name. its not in patch format > streaming-twitter pass twitter4j.FilterQuery argument to > TwitterUtils.createStream() > ------------------------------------------------------------------------------------ > > Key: SPARK-13065 > URL: https://issues.apache.org/jira/browse/SPARK-13065 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.6.0 > Environment: all > Reporter: Andrew Davidson > Priority: Minor > Labels: twitter > Attachments: twitterFilterQueryPatch.tar.gz > > Original Estimate: 2h > Remaining Estimate: 2h > > The twitter stream api is very powerful provides a lot of support for > twitter.com side filtering of status objects. When ever possible we want to > let twitter do as much work as possible for us. > currently the spark twitter api only allows you to configure a small sub set > of possible filters > String{} filters = {"tag1", tag2"} > JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, > filters); > The current implemenation does > private[streaming] > class TwitterReceiver( > twitterAuth: Authorization, > filters: Seq[String], > storageLevel: StorageLevel > ) extends Receiver[Status](storageLevel) with Logging { > . . . > val query = new FilterQuery > if (filters.size > 0) { > query.track(filters.mkString(",")) > newTwitterStream.filter(query) > } else { > newTwitterStream.sample() > } > ... > rather than construct the FilterQuery object in TwitterReceiver.onStart(). we > should be able to pass a FilterQueryObject > looks like an easy fix. See source code links bellow > kind regards > Andy > https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60 > https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89 > $$$$$$$$$ 2/2/16 > attached is my java implementation for this problem. Feel free to reuse it > how ever you like. In my streaming spark app main() I have the following code > FilterQuery query = config.getFilterQuery().fetch(); > if (query != null) { > // TODO https://issues.apache.org/jira/browse/SPARK-13065 > tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, > query); > } /*else > spark native api > String[] filters = {"tag1", tag2"} > tweets = TwitterUtils.createStream(ssc, twitterAuth, filters); > > see > https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89 > > causes > val query = new FilterQuery > if (filters.size > 0) { > query.track(filters.mkString(",")) > newTwitterStream.filter(query) > } */ -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org