What you said is correct and I am expecting the printlns to be in my console or 
my SparkUI. I do not see it in either places. However, if you run the program 
then the printlns do print for the constructor of the receiver and the for the 
foreach statements with total count 0. When you run it in regular more with no 
master attached then you will see the counts being printed out in the console 
as well. Please compile my program and try it out, I have spent significant 
time on debugging where it can go wrong and could not find an answer. I also 
see the starting receiver logs from spark when no master is defined, but do not 
see it when there is. Also, I am running some other simple code with 
spark-submit with printlns and I do see them in my SparkUI, but not for spark 
streaming.
Thanks,Ankit

From: t...@databricks.com
Date: Mon, 20 Apr 2015 13:29:31 -0700
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
To: patel7...@hotmail.com
CC: ak...@sigmoidanalytics.com; user@spark.apache.org

Well, the receiver always runs as part of an executor. When running locally 
(that is, spark-submit without --master), the executor is in the same process 
as the driver, so you see the printlns. If you are running with --master 
spark://cluster, then the executors ar running in different process and 
possibly different nodes. Hence you dont see the printlns in the output of 
driver process. If you see the the output of executorsin the Spark UI, then you 
may find those prints.
TD
On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel <patel7...@hotmail.com> wrote:



The code I've written is simple as it just invokes a thread and calls a store 
method on the Receiver class.
 I see this code with printlns working fine when I try spark-submit --jars 
<jar> --class test.TestCustomReceiver <jar>
However it does not work with I try the same command above with --master 
spark://masterURLspark-submit --master spark://masterURL --jars <jar> --class 
test.TestCustomReceiver <jar>
I also tried setting the master in the conf that I am created, but that does 
not work either. I do not see the onStart println being printed when I use 
--master option. Please advice.
Also, the master I am attaching to has multiple workers across hosts with many 
threads available to it.
The code is pasted below (Classes: TestReviever, TestCustomReceiver):


package test;import org.apache.spark.storage.StorageLevel;import 
org.apache.spark.streaming.receiver.Receiver;
public class TestReceiver extends Receiver<String> {
         public TestReceiver() {           super(StorageLevel.MEMORY_ONLY());   
        System.out.println("Ankit: Created TestReceiver");         }
         @Override         public void onStart() {                
System.out.println("Start TestReceiver");                new 
TestThread().start();         }         public void onStop() {}  
@SuppressWarnings("unused")
       private class TestThread extends Thread{                  @Override      
            public void run() {                           while(true){          
                        try{                                         sleep( 
(long) (Math.random() * 3000));                                  
}catch(Exception e){                                         
e.printStackTrace();                                  }                         
         store("Time: " + System.currentTimeMillis());                          
 }                     }         }
       }
 
package test;import org.apache.spark.SparkConf;import 
org.apache.spark.api.java.function.Function;import 
org.apache.spark.streaming.Duration;import 
org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
public class TestCustomReceiver {       public static void main(String[] args){ 
             SparkConf conf = new SparkConf();              
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));  
            TestReceiver receiver = new TestReceiver();              
JavaReceiverInputDStream<String> stream = ssc.receiverStream(receiver);         
     stream.map(new Function(){                     @Override                   
  public Object call(Object arg0) throws Exception {                           
System.out.println("Received: " + arg0);                           return arg0; 
                    }              }).foreachRDD(new Function(){                
     @Override                     public Object call(Object arg0) throws 
Exception {                           System.out.println("Total Count: " + 
((org.apache.spark.api.java.JavaRDD)arg0).count());                           
return arg0;                     }              });              ssc.start();   
           ssc.awaitTermination();       }
}

Thanks,Ankit

Date: Mon, 20 Apr 2015 12:22:03 +0530
Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver 
attached to master with multiple workers
From: ak...@sigmoidanalytics.com
To: patel7...@hotmail.com
CC: user@spark.apache.org

Would be good, if you can paste your custom receiver code and the code that you 
used to invoke it.ThanksBest Regards

On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel <patel7...@hotmail.com> wrote:







I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart method 
is never called on CustomReceiver when calling spark-submit against a master 
node with multiple workers. However, SparkStreaming works fine with no master 
node set. Anyone notice this issue?                                             
                               

                                          

                                          

Reply via email to