Hi Akhil,

That's exactly what I needed. You saved my day :)

Thanks a lot,

Best,
Zoran

On Fri, Jul 24, 2015 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Yes, that is correct, sorry for confusing you. But i guess this is what
> you are looking for, let me know if that doesn't help:
>
> val filtered_statuses = stream.transform(rdd =>{
>
>       //Instead of hardcoding, you can fetch these from a MySQL or a file
> or whatever
>       val sampleHashTags =
> Array("#teenchoice".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase)
>
>       val filtered = rdd.filter(status =>{
>         var found = false
>         for(tag <- sampleHashTags){
>           if(status.getText.toLowerCase.contains(tag)) found = true
>         }
>         found
>       })
>
>       filtered
>     })
>
> Thanks
> Best Regards
>
> On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic <zoran.jere...@gmail.com>
> wrote:
>
>> Hi Akhil,
>>
>> Thank you for sending this code. My apologize if I will ask something
>> that is obvious here, since I'm newbie in Scala, but I still don't see how
>> I can use this code. Maybe my original question was not very clear.
>>
>> What I need is to get each Twitter Status that contains one of the
>> hashtags I'm following. I'm implementing learning platform, where each
>> course will have at least one hashtag, e.g. #cs270computersciencecourse. If
>> somebody post anything on Twitter with that hashtag, I want  to get it and
>> save Twitter status in the system, so it can be shown in the application.
>> Other tweets should be ignored, but each tweet containing one of the
>> hashtags I'm following should be stored in the application, so I can't
>> process "most popular tweets" or something like that where it's possible
>> that I miss somebody's post. There is a list of hashtags that is followed
>> by stream, and this list should be possible to be updated by users.
>>
>> If I understood well, code you sent me extracts hashtags from statuses
>> received through the stream, and it continue processing these hashtags, but
>> at the end I will have only hashtags without statuses. Is that correct, or
>> I missed something?
>>
>> Thanks,
>> Zoran
>>
>>
>>
>> On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> That was a pseudo code, working version would look like this:
>>>
>>>     val stream = TwitterUtils.createStream(ssc, None)
>>>
>>>     val hashTags = stream.flatMap(status => status.getText.split("
>>> ").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1))
>>>
>>>     val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
>>> Seconds(10))
>>>       .map{case (topic, count) => (count, topic)}
>>>       .transform(_.sortByKey(false)).map(x => x._2)
>>>
>>>     topCounts10.print()
>>>
>>>
>>>     val filteredStream = topCounts10.transform(rdd =>{
>>>       *val samplehashtags =
>>> ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))*
>>>       val newRDD = samplehashtags.map { x => (x,1) }
>>>       val joined = newRDD.join(rdd)
>>>
>>>       joined
>>>     })
>>>
>>>     filteredStream.print()
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com>
>>> wrote:
>>>
>>>> Hi Akhil and Jorn,
>>>>
>>>> I tried as you suggested to create some simple scenario, but I have an
>>>> error on rdd.join(newRDD):  "value join is not a member of
>>>> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
>>>>
>>>>     val stream = TwitterUtils.createStream(ssc, auth)
>>>>>     val filteredStream= stream.transform(rdd =>{
>>>>>     val samplehashtags=Array("music","film")
>>>>>     val newRDD= samplehashtags.map { x => (x,1) }
>>>>>     rdd.join(newRDD)
>>>>>      })
>>>>>
>>>>
>>>> Did I miss something here?
>>>>
>>>> Thanks,
>>>> Zoran
>>>>
>>>> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks for explanation.
>>>>>
>>>>> If I understand this correctly, in this approach I would actually
>>>>> stream everything from Twitter, and perform filtering in my application
>>>>> using Spark. Isn't this too much overhead if my application is interested
>>>>> in listening for couple of hundreds or thousands hashtags?
>>>>> On one side, this will be better approach since I will not have the
>>>>> problem to open new streams if number of hashtags go over 400 which is the
>>>>> Twitter limit for User stream filtering, but on the other side I'm concern
>>>>> about how much it will affect application performance if I stream
>>>>> everything that is posted on Twitter and filter it locally. It would be
>>>>> great if somebody with experience on this could comment on these concerns.
>>>>>
>>>>> Thanks,
>>>>> Zoran
>>>>>
>>>>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <
>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> Jorn meant something like this:
>>>>>>
>>>>>> val filteredStream = twitterStream.transform(rdd =>{
>>>>>>
>>>>>> val newRDD =
>>>>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>>>>>
>>>>>> rdd.join(newRDD)
>>>>>>
>>>>>> })
>>>>>>
>>>>>> ​newRDD will work like a filter when you do the join.​
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <
>>>>>> zoran.jere...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jorn,
>>>>>>>
>>>>>>> I didn't know that it is possible to change filter without
>>>>>>> re-opening twitter stream. Actually, I already had that question 
>>>>>>> earlier at
>>>>>>> the stackoverflow
>>>>>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>>>>>>> and I got the answer that it's not possible, but it would be even 
>>>>>>> better if
>>>>>>> there is some other way to add new hashtags or to remove old hashtags 
>>>>>>> that
>>>>>>> user stopped following. I guess the second request would be more 
>>>>>>> difficult.
>>>>>>>
>>>>>>> However, it would be great if you can give me some short example how
>>>>>>> to make this. I didn't understand well from your explanation what you 
>>>>>>> mean
>>>>>>> by "join it with a rdd loading the newest hash tags from disk in a 
>>>>>>> regular
>>>>>>> interval".
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Zoran
>>>>>>>
>>>>>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Why do you even want to stop it? You can join it with a rdd loading
>>>>>>>> the newest hash tags from disk in a regular interval
>>>>>>>>
>>>>>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <
>>>>>>>> zoran.jere...@gmail.com> a écrit :
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have a twitter spark stream initialized in the following way:
>>>>>>>>>
>>>>>>>>>               val ssc:StreamingContext =
>>>>>>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>>>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>>>>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>>>>>>>> Some(new
>>>>>>>>>>
>>>>>>>>>> twitter4j.auth.OAuthAuthorization(config))
>>>>>>>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>>>>>>>> filters)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This works fine when I initialy start it. However, at some point I
>>>>>>>>> need to update filters since users might add new hashtags they want to
>>>>>>>>> follow. I tried to stop the running stream and spark streaming context
>>>>>>>>> without stoping spark context, e.g:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>                stream.stop()
>>>>>>>>>>                ssc.stop(false)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Afterward, I'm trying to initialize a new Twitter stream like I
>>>>>>>>> did previously. However, I got this exception:
>>>>>>>>>
>>>>>>>>> Exception in thread "Firestorm JMX Monitor"
>>>>>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, 
>>>>>>>>>> and
>>>>>>>>>> output operations after stopping a context is not supported
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>>>>>>     at
>>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>>>>>>     at
>>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>>>>>>     at
>>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>>>>>>     at
>>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>>>>>>     at
>>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>>>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>>>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>>>>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream
>>>>>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>>>>>>> (SLF4JLogger.java:83)         Inflater has been closed
>>>>>>>>>> ERROR    [2015-07-18 22:24:32,503]
>>>>>>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         
>>>>>>>>>> Error
>>>>>>>>>> stopping receiver
>>>>>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Anybody can explain how to solve this issue?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Zoran
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *******************************************************************************
>>>>>>> Zoran Jeremic, PhD
>>>>>>> Senior System Analyst & Programmer
>>>>>>>
>>>>>>> Athabasca University
>>>>>>> Tel: +1 604 92 89 944
>>>>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>>>>> Homepage:  http://zoranjeremic.org
>>>>>>>
>>>>>>> **********************************************************************************
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *******************************************************************************
>>>>> Zoran Jeremic, PhD
>>>>> Senior System Analyst & Programmer
>>>>>
>>>>> Athabasca University
>>>>> Tel: +1 604 92 89 944
>>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>>> Homepage:  http://zoranjeremic.org
>>>>>
>>>>> **********************************************************************************
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *******************************************************************************
>>>> Zoran Jeremic, PhD
>>>> Senior System Analyst & Programmer
>>>>
>>>> Athabasca University
>>>> Tel: +1 604 92 89 944
>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>> Homepage:  http://zoranjeremic.org
>>>>
>>>> **********************************************************************************
>>>>
>>>
>>>
>>
>>
>> --
>>
>> *******************************************************************************
>> Zoran Jeremic, PhD
>> Senior System Analyst & Programmer
>>
>> Athabasca University
>> Tel: +1 604 92 89 944
>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>> Homepage:  http://zoranjeremic.org
>>
>> **********************************************************************************
>>
>
>


-- 
*******************************************************************************
Zoran Jeremic, PhD
Senior System Analyst & Programmer

Athabasca University
Tel: +1 604 92 89 944
E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
Homepage:  http://zoranjeremic.org
**********************************************************************************

Reply via email to