Oh - and one other note on this, which appears to be the case.

If , in your stream forEachRDD implementation, you do something stupid
(like call rdd.count())

    tweetStream.foreachRDD((rdd,lent)=> {
      tweetStream.repartition(1)
      numTweetsCollected+=1;
      //val count = rdd.count() DONT DO THIS !

You can also get stuck in a situation where your RDD processor blocks
infinitely.

And for twitter specific stuff, make sure to look at modifying the
TwitterInputDStream class
so that it implements the stuff from SPARK-2464, which can lead to infinite
stream reopening as well.



On Tue, Oct 21, 2014 at 11:02 AM, jay vyas <jayunit100.apa...@gmail.com>
wrote:

> Hi Spark !  I found out why my RDD's werent coming through in my spark
> stream.
>
> It turns out you need the onStart()  needs to return , it seems - i.e. you
> need to launch the worker part of your
> start process in a thread.  For example....
>
> def onStartMock():Unit ={
>       val future = new Thread(new Runnable() {
>         def run() {
>           for(x <- 1 until 1000000000) {
>             val newMem = Runtime.getRuntime.freeMemory()/12188091;
>             if(newMem != lastMem){
>               System.out.println("    in thread : " + newMem);
>             }
>             lastMem=newMem;
>             store(mockStatus);
>           }
>         }});
>
> Hope that helps somebody in the same situation.  FYI Its in the docs :)
>
>  * {{{
>  *  class MyReceiver(storageLevel: StorageLevel) extends
> NetworkReceiver[String](storageLevel) {
>  *      def onStart() {
>  *          // Setup stuff (start threads, open sockets, etc.) to start
> receiving data.
>  *          // Must start new thread to receive data, as onStart() must be
> non-blocking.
>  *
>  *          // Call store(...) in those threads to store received data
> into Spark's memory.
>  *
>  *          // Call stop(...), restart(...) or reportError(...) on any
> thread based on how
>  *          // different errors needs to be handled.
>  *
>  *          // See corresponding method documentation for more details
>  *      }
>  *
>  *      def onStop() {
>  *          // Cleanup stuff (stop threads, close sockets, etc.) to stop
> receiving data.
>  *      }
>  *  }
>  * }}}
>
>


-- 
jay vyas

Reply via email to