That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)).map(x => x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd =>{ *val samplehashtags = ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))* val newRDD = samplehashtags.map { x => (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com> wrote: > Hi Akhil and Jorn, > > I tried as you suggested to create some simple scenario, but I have an > error on rdd.join(newRDD): "value join is not a member of > org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like: > > val stream = TwitterUtils.createStream(ssc, auth) >> val filteredStream= stream.transform(rdd =>{ >> val samplehashtags=Array("music","film") >> val newRDD= samplehashtags.map { x => (x,1) } >> rdd.join(newRDD) >> }) >> > > Did I miss something here? > > Thanks, > Zoran > > On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com> > wrote: > >> Thanks for explanation. >> >> If I understand this correctly, in this approach I would actually stream >> everything from Twitter, and perform filtering in my application using >> Spark. Isn't this too much overhead if my application is interested in >> listening for couple of hundreds or thousands hashtags? >> On one side, this will be better approach since I will not have the >> problem to open new streams if number of hashtags go over 400 which is the >> Twitter limit for User stream filtering, but on the other side I'm concern >> about how much it will affect application performance if I stream >> everything that is posted on Twitter and filter it locally. It would be >> great if somebody with experience on this could comment on these concerns. >> >> Thanks, >> Zoran >> >> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Jorn meant something like this: >>> >>> val filteredStream = twitterStream.transform(rdd =>{ >>> >>> val newRDD = >>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1)) >>> >>> rdd.join(newRDD) >>> >>> }) >>> >>> newRDD will work like a filter when you do the join. >>> >>> >>> Thanks >>> Best Regards >>> >>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com> >>> wrote: >>> >>>> Hi Jorn, >>>> >>>> I didn't know that it is possible to change filter without re-opening >>>> twitter stream. Actually, I already had that question earlier at the >>>> stackoverflow >>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming> >>>> and I got the answer that it's not possible, but it would be even better if >>>> there is some other way to add new hashtags or to remove old hashtags that >>>> user stopped following. I guess the second request would be more difficult. >>>> >>>> However, it would be great if you can give me some short example how to >>>> make this. I didn't understand well from your explanation what you mean by >>>> "join it with a rdd loading the newest hash tags from disk in a regular >>>> interval". >>>> >>>> Thanks, >>>> Zoran >>>> >>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> >>>> wrote: >>>> >>>>> Why do you even want to stop it? You can join it with a rdd loading >>>>> the newest hash tags from disk in a regular interval >>>>> >>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> >>>>> a écrit : >>>>> >>>>>> Hi, >>>>>> >>>>>> I have a twitter spark stream initialized in the following way: >>>>>> >>>>>> val ssc:StreamingContext = >>>>>>> SparkLauncher.getSparkScalaStreamingContext() >>>>>>> val config = getTwitterConfigurationBuilder.build() >>>>>>> val auth: Option[twitter4j.auth.Authorization] = >>>>>>> Some(new >>>>>>> >>>>>>> twitter4j.auth.OAuthAuthorization(config)) >>>>>>> val stream = TwitterUtils.createStream(ssc, auth, >>>>>>> filters) >>>>>>> >>>>>> >>>>>> This works fine when I initialy start it. However, at some point I >>>>>> need to update filters since users might add new hashtags they want to >>>>>> follow. I tried to stop the running stream and spark streaming context >>>>>> without stoping spark context, e.g: >>>>>> >>>>>> >>>>>>> stream.stop() >>>>>>> ssc.stop(false) >>>>>>> >>>>>> >>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did >>>>>> previously. However, I got this exception: >>>>>> >>>>>> Exception in thread "Firestorm JMX Monitor" >>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and >>>>>>> output operations after stopping a context is not supported >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >>>>>>> at >>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >>>>>>> at >>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >>>>>>> at >>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >>>>>>> at >>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >>>>>>> at >>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >>>>>>> at >>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >>>>>>> at >>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >>>>>>> at java.util.TimerThread.mainLoop(Timer.java:555) >>>>>>> at java.util.TimerThread.run(Timer.java:505) >>>>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream >>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl >>>>>>> (SLF4JLogger.java:83) Inflater has been closed >>>>>>> ERROR [2015-07-18 22:24:32,503] >>>>>>> [sparkDriver-akka.actor.default-dispatcher-3] >>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) >>>>>>> Error >>>>>>> stopping receiver >>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >>>>>>> >>>>>> >>>>>>> >>>>>> >>>>>> Anybody can explain how to solve this issue? >>>>>> >>>>>> Thanks, >>>>>> Zoran >>>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> ******************************************************************************* >>>> Zoran Jeremic, PhD >>>> Senior System Analyst & Programmer >>>> >>>> Athabasca University >>>> Tel: +1 604 92 89 944 >>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>> Homepage: http://zoranjeremic.org >>>> >>>> ********************************************************************************** >>>> >>> >>> >> >> >> -- >> >> ******************************************************************************* >> Zoran Jeremic, PhD >> Senior System Analyst & Programmer >> >> Athabasca University >> Tel: +1 604 92 89 944 >> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >> Homepage: http://zoranjeremic.org >> >> ********************************************************************************** >> > > > > -- > > ******************************************************************************* > Zoran Jeremic, PhD > Senior System Analyst & Programmer > > Athabasca University > Tel: +1 604 92 89 944 > E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> > Homepage: http://zoranjeremic.org > > ********************************************************************************** >