The code I've written is simple as it just invokes a thread and calls a store 
method on the Receiver class.
 I see this code with printlns working fine when I try spark-submit --jars 
<jar> --class test.TestCustomReceiver <jar>
However it does not work with I try the same command above with --master 
spark://masterURLspark-submit --master spark://masterURL --jars <jar> --class 
test.TestCustomReceiver <jar>
I also tried setting the master in the conf that I am created, but that does 
not work either. I do not see the onStart println being printed when I use 
--master option. Please advice.
Also, the master I am attaching to has multiple workers across hosts with many 
threads available to it.
The code is pasted below (Classes: TestReviever, TestCustomReceiver):


package test;import org.apache.spark.storage.StorageLevel;import 
org.apache.spark.streaming.receiver.Receiver;
public class TestReceiver extends Receiver<String> {
         public TestReceiver() {           super(StorageLevel.MEMORY_ONLY());   
        System.out.println("Ankit: Created TestReceiver");         }
         @Override         public void onStart() {                
System.out.println("Start TestReceiver");                new 
TestThread().start();         }         public void onStop() {}  
@SuppressWarnings("unused")
       private class TestThread extends Thread{                  @Override      
            public void run() {                           while(true){          
                        try{                                         sleep( 
(long) (Math.random() * 3000));                                  
}catch(Exception e){                                         
e.printStackTrace();                                  }                         
         store("Time: " + System.currentTimeMillis());                          
 }                     }         }
       }
 
package test;import org.apache.spark.SparkConf;import 
org.apache.spark.api.java.function.Function;import 
org.apache.spark.streaming.Duration;import 
org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
public class TestCustomReceiver {       public static void main(String[] args){ 
             SparkConf conf = new SparkConf();              
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));  
            TestReceiver receiver = new TestReceiver();              
JavaReceiverInputDStream<String> stream = ssc.receiverStream(receiver);         
     stream.map(new Function(){                     @Override                   
  public Object call(Object arg0) throws Exception {                           
System.out.println("Received: " + arg0);                           return arg0; 
                    }              }).foreachRDD(new Function(){                
     @Override                     public Object call(Object arg0) throws 
Exception {                           System.out.println("Total Count: " + 
((org.apache.spark.api.java.JavaRDD)arg0).count());                           
return arg0;                     }              });              ssc.start();   
           ssc.awaitTermination();       }
}

Thanks,Ankit

Date: Mon, 20 Apr 2015 12:22:03 +0530
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
From: ak...@sigmoidanalytics.com
To: patel7...@hotmail.com
CC: user@spark.apache.org

Would be good, if you can paste your custom receiver code and the code that you 
used to invoke it.ThanksBest Regards

On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel <patel7...@hotmail.com> wrote:







I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart method 
is never called on CustomReceiver when calling spark-submit against a master 
node with multiple workers. However, SparkStreaming works fine with no master 
node set. Anyone notice this issue?                                             
                               

                                          

Reply via email to