java.lang.RuntimeException: java.lang.AssertionError: assertion failed: A
ReceiverSupervisor has not been attached to the receiver yet. Maybe you are
starting some computation in the receiver before the Receiver.onStart() has been
called.

importorg.apache.spark.SparkConf;importorg.apache.spark.storage.StorageLevel;importorg.apache.spark.streaming.api.java.JavaReceiverInputDStream;importorg.apache.spark.streaming.api.java.JavaStreamingContext;importorg.apache.spark.streaming.receiver.Receiver;importorg.apache.spark.streaming.Duration;publicclassSparkStreamingReceiverextendsReceiver<String>
 
{privatestaticfinalSparkStreamReceivingServersparkStreamReceivingServer=newSparkStreamReceivingServer();privatestaticfinalSparkStreamingReceiversparkStreamingReceiver=newSparkStreamingReceiver();publicSparkStreamingReceiver()
 {super(StorageLevel.MEMORY_AND_DISK_2());    }    @OverridepublicvoidonStart() {newThread()  {    
@Overridepublicvoidrun() {sparkStreamReceivingServer.start();    }    }.start();    }    
@OverridepublicvoidonStop() {sparkStreamReceivingServer.stop();    }publicstaticvoidreceive(StringjsonMessage) 
{sparkStreamingReceiver.store(jsonMessage);    }publicstaticvoidmain(String[] args) throwsException 
{SparkConfsparkConf=newSparkConf().setAppName("JavaCustomReceiver").setMaster("local");JavaStreamingContextssc=newJavaStreamingContext(sparkConf,
 
newDuration(1000));JavaReceiverInputDStream<String>jsonMessagesDStream=ssc.receiverStream(sparkStreamingReceiver);jsonMessagesDStream.print();ssc.start();ssc.awaitTermination();
    }}

Not using Mixmax yet?

I have a custom receiver above and I followed the example from here

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
I read in an article that
"Receivers must have ReceiverSupervisors attached before they can be started 
sincestore and management methods simply
pass calls on to the respective methods in the ReceiverSupervisor"
but in the above example code I don't see any ReceiverSupervisor and I am not
sure how to modify my code to make this work? any suggestions?
Thanks!

Reply via email to