I am also under the assumption that *onStart *function of the Receiver is only called only once by Spark. please correct me if I am wrong.
On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> wrote: > My driver program runs a spark streaming job. And it spawns a thread by > itself only in the *onStart()* function below Other than that it doesn't > spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD, > Collect functions. > > public class NSQReceiver extends Receiver<String> { > > private String topic=""; > > public NSQReceiver(String topic) { > super(StorageLevel.MEMORY_AND_DISK_2()); > this.topic = topic; > } > > @Override > public void *onStart()* { > new Thread() { > @Override public void run() { > receive(); > } > }.start(); > } > > } > > > Environment info: > > Java 8 > > Scala 2.11.8 > > Spark 2.0.0 > > More than happy to share any other info you may need. > > > On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com> wrote: > >> > how do I tell my spark driver program to not create so many? >> >> This may depend on your driver program. Do you spawn any threads in >> it? Could you share some more information on the driver program, spark >> version and your environment? It would greatly help others to help you >> >> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> wrote: >> > The source of my problem is actually that I am running into the >> following >> > error. This error seems to happen after running my driver program for 4 >> > hours. >> > >> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread >> > "dag-scheduler-event-loop" Exception in thread >> "ForkJoinPool-50-worker-13" >> > java.lang.OutOfMemoryError: unable to create new native thread" >> > >> > and this wonderful book taught me that the error "unable to create new >> > native thread" can happen because JVM is trying to request the OS for a >> > thread and it is refusing to do so for the following reasons >> > >> > 1. The system has actually run out of virtual memory. >> > 2. On Unix-style systems, the user has already created (between all >> programs >> > user is running) the maximum number of processes configured for that >> user >> > login. Individual threads are considered a process in that regard. >> > >> > Option #2 is ruled out in my case because my driver programing is >> running >> > with a userid of root which has maximum number of processes set to >> 120242 >> > >> > ulimit -a gives me the following >> > >> > core file size (blocks, -c) 0 >> > data seg size (kbytes, -d) unlimited >> > scheduling priority (-e) 0 >> > file size (blocks, -f) unlimited >> > pending signals (-i) 120242 >> > max locked memory (kbytes, -l) 64 >> > max memory size (kbytes, -m) unlimited >> > open files (-n) 1024 >> > pipe size (512 bytes, -p) 8 >> > POSIX message queues (bytes, -q) 819200 >> > real-time priority (-r) 0 >> > stack size (kbytes, -s) 8192 >> > cpu time (seconds, -t) unlimited >> > max user processes (-u) 120242 >> > virtual memory (kbytes, -v) unlimited >> > file locks (-x) unlimited >> > >> > So at this point I do understand that the I am running out of memory >> due to >> > allocation of threads so my biggest question is how do I tell my spark >> > driver program to not create so many? >> > >> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> wrote: >> >> >> >> ps -L [pid] is what shows threads. I am not sure this is counting what >> you >> >> think it does. My shell process has about a hundred threads, and I >> can't >> >> imagine why one would have thousands unless your app spawned them. >> >> >> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com> >> wrote: >> >>> >> >>> when I do >> >>> >> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l >> >>> >> >>> The result is around 32K. why does it create so many threads how can I >> >>> limit this? >> > >> > >> > >