The code I've written is simple as it just invokes a thread and calls a store method on the Receiver class. I see this code with printlns working fine when I try spark-submit --jars <jar> --class test.TestCustomReceiver <jar> However it does not work with I try the same command above with --master spark://masterURLspark-submit --master spark://masterURL --jars <jar> --class test.TestCustomReceiver <jar> I also tried setting the master in the conf that I am created, but that does not work either. I do not see the onStart println being printed when I use --master option. Please advice. Also, the master I am attaching to has multiple workers across hosts with many threads available to it. The code is pasted below (Classes: TestReviever, TestCustomReceiver):
package test;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.receiver.Receiver; public class TestReceiver extends Receiver<String> { public TestReceiver() { super(StorageLevel.MEMORY_ONLY()); System.out.println("Ankit: Created TestReceiver"); } @Override public void onStart() { System.out.println("Start TestReceiver"); new TestThread().start(); } public void onStop() {} @SuppressWarnings("unused") private class TestThread extends Thread{ @Override public void run() { while(true){ try{ sleep( (long) (Math.random() * 3000)); }catch(Exception e){ e.printStackTrace(); } store("Time: " + System.currentTimeMillis()); } } } } package test;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext; public class TestCustomReceiver { public static void main(String[] args){ SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); TestReceiver receiver = new TestReceiver(); JavaReceiverInputDStream<String> stream = ssc.receiverStream(receiver); stream.map(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println("Received: " + arg0); return arg0; } }).foreachRDD(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println("Total Count: " + ((org.apache.spark.api.java.JavaRDD)arg0).count()); return arg0; } }); ssc.start(); ssc.awaitTermination(); } } Thanks,Ankit Date: Mon, 20 Apr 2015 12:22:03 +0530 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers From: ak...@sigmoidanalytics.com To: patel7...@hotmail.com CC: user@spark.apache.org Would be good, if you can paste your custom receiver code and the code that you used to invoke it.ThanksBest Regards On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel <patel7...@hotmail.com> wrote: I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart method is never called on CustomReceiver when calling spark-submit against a master node with multiple workers. However, SparkStreaming works fine with no master node set. Anyone notice this issue?