That was a pseudo code, working version would look like this:

    val stream = TwitterUtils.createStream(ssc, None)

    val hashTags = stream.flatMap(status => status.getText.split("
").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1))

    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(10))
      .map{case (topic, count) => (count, topic)}
      .transform(_.sortByKey(false)).map(x => x._2)

    topCounts10.print()


    val filteredStream = topCounts10.transform(rdd =>{
      *val samplehashtags =
ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))*
      val newRDD = samplehashtags.map { x => (x,1) }
      val joined = newRDD.join(rdd)

      joined
    })

    filteredStream.print()

Thanks
Best Regards

On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com>
wrote:

> Hi Akhil and Jorn,
>
> I tried as you suggested to create some simple scenario, but I have an
> error on rdd.join(newRDD):  "value join is not a member of
> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
>
>     val stream = TwitterUtils.createStream(ssc, auth)
>>     val filteredStream= stream.transform(rdd =>{
>>     val samplehashtags=Array("music","film")
>>     val newRDD= samplehashtags.map { x => (x,1) }
>>     rdd.join(newRDD)
>>      })
>>
>
> Did I miss something here?
>
> Thanks,
> Zoran
>
> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com>
> wrote:
>
>> Thanks for explanation.
>>
>> If I understand this correctly, in this approach I would actually stream
>> everything from Twitter, and perform filtering in my application using
>> Spark. Isn't this too much overhead if my application is interested in
>> listening for couple of hundreds or thousands hashtags?
>> On one side, this will be better approach since I will not have the
>> problem to open new streams if number of hashtags go over 400 which is the
>> Twitter limit for User stream filtering, but on the other side I'm concern
>> about how much it will affect application performance if I stream
>> everything that is posted on Twitter and filter it locally. It would be
>> great if somebody with experience on this could comment on these concerns.
>>
>> Thanks,
>> Zoran
>>
>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Jorn meant something like this:
>>>
>>> val filteredStream = twitterStream.transform(rdd =>{
>>>
>>> val newRDD =
>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>>
>>> rdd.join(newRDD)
>>>
>>> })
>>>
>>> ​newRDD will work like a filter when you do the join.​
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jorn,
>>>>
>>>> I didn't know that it is possible to change filter without re-opening
>>>> twitter stream. Actually, I already had that question earlier at the
>>>> stackoverflow
>>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>>>> and I got the answer that it's not possible, but it would be even better if
>>>> there is some other way to add new hashtags or to remove old hashtags that
>>>> user stopped following. I guess the second request would be more difficult.
>>>>
>>>> However, it would be great if you can give me some short example how to
>>>> make this. I didn't understand well from your explanation what you mean by
>>>> "join it with a rdd loading the newest hash tags from disk in a regular
>>>> interval".
>>>>
>>>> Thanks,
>>>> Zoran
>>>>
>>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Why do you even want to stop it? You can join it with a rdd loading
>>>>> the newest hash tags from disk in a regular interval
>>>>>
>>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com>
>>>>> a écrit :
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a twitter spark stream initialized in the following way:
>>>>>>
>>>>>>               val ssc:StreamingContext =
>>>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>>>>> Some(new
>>>>>>>
>>>>>>> twitter4j.auth.OAuthAuthorization(config))
>>>>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>>>>> filters)
>>>>>>>
>>>>>>
>>>>>> This works fine when I initialy start it. However, at some point I
>>>>>> need to update filters since users might add new hashtags they want to
>>>>>> follow. I tried to stop the running stream and spark streaming context
>>>>>> without stoping spark context, e.g:
>>>>>>
>>>>>>
>>>>>>>                stream.stop()
>>>>>>>                ssc.stop(false)
>>>>>>>
>>>>>>
>>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>>>>> previously. However, I got this exception:
>>>>>>
>>>>>> Exception in thread "Firestorm JMX Monitor"
>>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>>>>>> output operations after stopping a context is not supported
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>>>     at
>>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>>>     at
>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>>>     at
>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>>>     at
>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>>>     at
>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>>>     at
>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream
>>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>>>> (SLF4JLogger.java:83)         Inflater has been closed
>>>>>>> ERROR    [2015-07-18 22:24:32,503]
>>>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         
>>>>>>> Error
>>>>>>> stopping receiver
>>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>>>
>>>>>>
>>>>>>>
>>>>>>
>>>>>> Anybody can explain how to solve this issue?
>>>>>>
>>>>>> Thanks,
>>>>>> Zoran
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *******************************************************************************
>>>> Zoran Jeremic, PhD
>>>> Senior System Analyst & Programmer
>>>>
>>>> Athabasca University
>>>> Tel: +1 604 92 89 944
>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>> Homepage:  http://zoranjeremic.org
>>>>
>>>> **********************************************************************************
>>>>
>>>
>>>
>>
>>
>> --
>>
>> *******************************************************************************
>> Zoran Jeremic, PhD
>> Senior System Analyst & Programmer
>>
>> Athabasca University
>> Tel: +1 604 92 89 944
>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>> Homepage:  http://zoranjeremic.org
>>
>> **********************************************************************************
>>
>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
> Homepage:  http://zoranjeremic.org
>
> **********************************************************************************
>

Reply via email to