Hi Ryan, It happens on the driver side and I am running on a client mode (not the cluster mode).
Thanks! On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu < shixi...@databricks.com> wrote: > Sorry, there is a typo in my previous email: this may **not** be the root > cause if the leak threads are in the driver side. > > Does it happen in the driver or executors? > > On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi Ryan, >> >> Ahh My Receiver.onStop method is currently empty. >> >> 1) I have a hard time seeing why the receiver would crash so many times >> within a span of 4 to 5 hours but anyways I understand I should still >> cleanup during OnStop. >> >> 2) How do I clean up those threads? The documentation here >> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem >> to have any method where I can clean up the threads created during OnStart. >> any ideas? >> >> Thanks! >> >> >> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> So in your code, each Receiver will start a new thread. Did you stop the >>> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads >>> after a receiver crashes and is restarted by Spark. However, this may be >>> the root cause since the leak threads are in the driver side. Could you use >>> `jstack` to check which types of threads are leaking? >>> >>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> I am also under the assumption that *onStart *function of the Receiver is >>>> only called only once by Spark. please correct me if I am wrong. >>>> >>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> My driver program runs a spark streaming job. And it spawns a thread >>>>> by itself only in the *onStart()* function below Other than that it >>>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey, >>>>> forEachRDD, Collect functions. >>>>> >>>>> public class NSQReceiver extends Receiver<String> { >>>>> >>>>> private String topic=""; >>>>> >>>>> public NSQReceiver(String topic) { >>>>> super(StorageLevel.MEMORY_AND_DISK_2()); >>>>> this.topic = topic; >>>>> } >>>>> >>>>> @Override >>>>> public void *onStart()* { >>>>> new Thread() { >>>>> @Override public void run() { >>>>> receive(); >>>>> } >>>>> }.start(); >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> Environment info: >>>>> >>>>> Java 8 >>>>> >>>>> Scala 2.11.8 >>>>> >>>>> Spark 2.0.0 >>>>> >>>>> More than happy to share any other info you may need. >>>>> >>>>> >>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com> >>>>> wrote: >>>>> >>>>>> > how do I tell my spark driver program to not create so many? >>>>>> >>>>>> This may depend on your driver program. Do you spawn any threads in >>>>>> it? Could you share some more information on the driver program, spark >>>>>> version and your environment? It would greatly help others to help you >>>>>> >>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> > The source of my problem is actually that I am running into the >>>>>> following >>>>>> > error. This error seems to happen after running my driver program >>>>>> for 4 >>>>>> > hours. >>>>>> > >>>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread >>>>>> > "dag-scheduler-event-loop" Exception in thread >>>>>> "ForkJoinPool-50-worker-13" >>>>>> > java.lang.OutOfMemoryError: unable to create new native thread" >>>>>> > >>>>>> > and this wonderful book taught me that the error "unable to create >>>>>> new >>>>>> > native thread" can happen because JVM is trying to request the OS >>>>>> for a >>>>>> > thread and it is refusing to do so for the following reasons >>>>>> > >>>>>> > 1. The system has actually run out of virtual memory. >>>>>> > 2. On Unix-style systems, the user has already created (between all >>>>>> programs >>>>>> > user is running) the maximum number of processes configured for >>>>>> that user >>>>>> > login. Individual threads are considered a process in that regard. >>>>>> > >>>>>> > Option #2 is ruled out in my case because my driver programing is >>>>>> running >>>>>> > with a userid of root which has maximum number of processes set to >>>>>> 120242 >>>>>> > >>>>>> > ulimit -a gives me the following >>>>>> > >>>>>> > core file size (blocks, -c) 0 >>>>>> > data seg size (kbytes, -d) unlimited >>>>>> > scheduling priority (-e) 0 >>>>>> > file size (blocks, -f) unlimited >>>>>> > pending signals (-i) 120242 >>>>>> > max locked memory (kbytes, -l) 64 >>>>>> > max memory size (kbytes, -m) unlimited >>>>>> > open files (-n) 1024 >>>>>> > pipe size (512 bytes, -p) 8 >>>>>> > POSIX message queues (bytes, -q) 819200 >>>>>> > real-time priority (-r) 0 >>>>>> > stack size (kbytes, -s) 8192 >>>>>> > cpu time (seconds, -t) unlimited >>>>>> > max user processes (-u) 120242 >>>>>> > virtual memory (kbytes, -v) unlimited >>>>>> > file locks (-x) unlimited >>>>>> > >>>>>> > So at this point I do understand that the I am running out of >>>>>> memory due to >>>>>> > allocation of threads so my biggest question is how do I tell my >>>>>> spark >>>>>> > driver program to not create so many? >>>>>> > >>>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> >>>>>> wrote: >>>>>> >> >>>>>> >> ps -L [pid] is what shows threads. I am not sure this is counting >>>>>> what you >>>>>> >> think it does. My shell process has about a hundred threads, and I >>>>>> can't >>>>>> >> imagine why one would have thousands unless your app spawned them. >>>>>> >> >>>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>> >>>>>> >>> when I do >>>>>> >>> >>>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l >>>>>> >>> >>>>>> >>> The result is around 32K. why does it create so many threads how >>>>>> can I >>>>>> >>> limit this? >>>>>> > >>>>>> > >>>>>> >>>>> >>>>> >>>> >>> >> >