Sorry, there is a typo in my previous email: this may **not** be the root cause if the leak threads are in the driver side.
Does it happen in the driver or executors? On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote: > Hi Ryan, > > Ahh My Receiver.onStop method is currently empty. > > 1) I have a hard time seeing why the receiver would crash so many times > within a span of 4 to 5 hours but anyways I understand I should still cleanup > during OnStop. > > 2) How do I clean up those threads? The documentation here > https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem > to have any method where I can clean up the threads created during OnStart. > any ideas? > > Thanks! > > > On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> So in your code, each Receiver will start a new thread. Did you stop the >> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads >> after a receiver crashes and is restarted by Spark. However, this may be >> the root cause since the leak threads are in the driver side. Could you use >> `jstack` to check which types of threads are leaking? >> >> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> wrote: >> >>> I am also under the assumption that *onStart *function of the Receiver is >>> only called only once by Spark. please correct me if I am wrong. >>> >>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> My driver program runs a spark streaming job. And it spawns a thread >>>> by itself only in the *onStart()* function below Other than that it >>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey, >>>> forEachRDD, Collect functions. >>>> >>>> public class NSQReceiver extends Receiver<String> { >>>> >>>> private String topic=""; >>>> >>>> public NSQReceiver(String topic) { >>>> super(StorageLevel.MEMORY_AND_DISK_2()); >>>> this.topic = topic; >>>> } >>>> >>>> @Override >>>> public void *onStart()* { >>>> new Thread() { >>>> @Override public void run() { >>>> receive(); >>>> } >>>> }.start(); >>>> } >>>> >>>> } >>>> >>>> >>>> Environment info: >>>> >>>> Java 8 >>>> >>>> Scala 2.11.8 >>>> >>>> Spark 2.0.0 >>>> >>>> More than happy to share any other info you may need. >>>> >>>> >>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com> >>>> wrote: >>>> >>>>> > how do I tell my spark driver program to not create so many? >>>>> >>>>> This may depend on your driver program. Do you spawn any threads in >>>>> it? Could you share some more information on the driver program, spark >>>>> version and your environment? It would greatly help others to help you >>>>> >>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> > The source of my problem is actually that I am running into the >>>>> following >>>>> > error. This error seems to happen after running my driver program >>>>> for 4 >>>>> > hours. >>>>> > >>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread >>>>> > "dag-scheduler-event-loop" Exception in thread >>>>> "ForkJoinPool-50-worker-13" >>>>> > java.lang.OutOfMemoryError: unable to create new native thread" >>>>> > >>>>> > and this wonderful book taught me that the error "unable to create >>>>> new >>>>> > native thread" can happen because JVM is trying to request the OS >>>>> for a >>>>> > thread and it is refusing to do so for the following reasons >>>>> > >>>>> > 1. The system has actually run out of virtual memory. >>>>> > 2. On Unix-style systems, the user has already created (between all >>>>> programs >>>>> > user is running) the maximum number of processes configured for that >>>>> user >>>>> > login. Individual threads are considered a process in that regard. >>>>> > >>>>> > Option #2 is ruled out in my case because my driver programing is >>>>> running >>>>> > with a userid of root which has maximum number of processes set to >>>>> 120242 >>>>> > >>>>> > ulimit -a gives me the following >>>>> > >>>>> > core file size (blocks, -c) 0 >>>>> > data seg size (kbytes, -d) unlimited >>>>> > scheduling priority (-e) 0 >>>>> > file size (blocks, -f) unlimited >>>>> > pending signals (-i) 120242 >>>>> > max locked memory (kbytes, -l) 64 >>>>> > max memory size (kbytes, -m) unlimited >>>>> > open files (-n) 1024 >>>>> > pipe size (512 bytes, -p) 8 >>>>> > POSIX message queues (bytes, -q) 819200 >>>>> > real-time priority (-r) 0 >>>>> > stack size (kbytes, -s) 8192 >>>>> > cpu time (seconds, -t) unlimited >>>>> > max user processes (-u) 120242 >>>>> > virtual memory (kbytes, -v) unlimited >>>>> > file locks (-x) unlimited >>>>> > >>>>> > So at this point I do understand that the I am running out of memory >>>>> due to >>>>> > allocation of threads so my biggest question is how do I tell my >>>>> spark >>>>> > driver program to not create so many? >>>>> > >>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> >>>>> wrote: >>>>> >> >>>>> >> ps -L [pid] is what shows threads. I am not sure this is counting >>>>> what you >>>>> >> think it does. My shell process has about a hundred threads, and I >>>>> can't >>>>> >> imagine why one would have thousands unless your app spawned them. >>>>> >> >>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>> >>>>> >>> when I do >>>>> >>> >>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l >>>>> >>> >>>>> >>> The result is around 32K. why does it create so many threads how >>>>> can I >>>>> >>> limit this? >>>>> > >>>>> > >>>>> >>>> >>>> >>> >> >