Then it should not be a Receiver issue. Could you use `jstack` to find out the name of leaking threads?
On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth...@gmail.com> wrote: > Hi Ryan, > > It happens on the driver side and I am running on a client mode (not the > cluster mode). > > Thanks! > > On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Sorry, there is a typo in my previous email: this may **not** be the >> root cause if the leak threads are in the driver side. >> >> Does it happen in the driver or executors? >> >> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi Ryan, >>> >>> Ahh My Receiver.onStop method is currently empty. >>> >>> 1) I have a hard time seeing why the receiver would crash so many times >>> within a span of 4 to 5 hours but anyways I understand I should still >>> cleanup during OnStop. >>> >>> 2) How do I clean up those threads? The documentation here >>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't >>> seem to have any method where I can clean up the threads created during >>> OnStart. any ideas? >>> >>> Thanks! >>> >>> >>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> So in your code, each Receiver will start a new thread. Did you stop >>>> the receiver properly in `Receiver.onStop`? Otherwise, you may leak threads >>>> after a receiver crashes and is restarted by Spark. However, this may be >>>> the root cause since the leak threads are in the driver side. Could you use >>>> `jstack` to check which types of threads are leaking? >>>> >>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> I am also under the assumption that *onStart *function of the Receiver >>>>> is only called only once by Spark. please correct me if I am wrong. >>>>> >>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> My driver program runs a spark streaming job. And it spawns a thread >>>>>> by itself only in the *onStart()* function below Other than that it >>>>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey, >>>>>> forEachRDD, Collect functions. >>>>>> >>>>>> public class NSQReceiver extends Receiver<String> { >>>>>> >>>>>> private String topic=""; >>>>>> >>>>>> public NSQReceiver(String topic) { >>>>>> super(StorageLevel.MEMORY_AND_DISK_2()); >>>>>> this.topic = topic; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void *onStart()* { >>>>>> new Thread() { >>>>>> @Override public void run() { >>>>>> receive(); >>>>>> } >>>>>> }.start(); >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> Environment info: >>>>>> >>>>>> Java 8 >>>>>> >>>>>> Scala 2.11.8 >>>>>> >>>>>> Spark 2.0.0 >>>>>> >>>>>> More than happy to share any other info you may need. >>>>>> >>>>>> >>>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com> >>>>>> wrote: >>>>>> >>>>>>> > how do I tell my spark driver program to not create so many? >>>>>>> >>>>>>> This may depend on your driver program. Do you spawn any threads in >>>>>>> it? Could you share some more information on the driver program, >>>>>>> spark >>>>>>> version and your environment? It would greatly help others to help >>>>>>> you >>>>>>> >>>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> > The source of my problem is actually that I am running into the >>>>>>> following >>>>>>> > error. This error seems to happen after running my driver program >>>>>>> for 4 >>>>>>> > hours. >>>>>>> > >>>>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in >>>>>>> thread >>>>>>> > "dag-scheduler-event-loop" Exception in thread >>>>>>> "ForkJoinPool-50-worker-13" >>>>>>> > java.lang.OutOfMemoryError: unable to create new native thread" >>>>>>> > >>>>>>> > and this wonderful book taught me that the error "unable to create >>>>>>> new >>>>>>> > native thread" can happen because JVM is trying to request the OS >>>>>>> for a >>>>>>> > thread and it is refusing to do so for the following reasons >>>>>>> > >>>>>>> > 1. The system has actually run out of virtual memory. >>>>>>> > 2. On Unix-style systems, the user has already created (between >>>>>>> all programs >>>>>>> > user is running) the maximum number of processes configured for >>>>>>> that user >>>>>>> > login. Individual threads are considered a process in that regard. >>>>>>> > >>>>>>> > Option #2 is ruled out in my case because my driver programing is >>>>>>> running >>>>>>> > with a userid of root which has maximum number of processes set >>>>>>> to 120242 >>>>>>> > >>>>>>> > ulimit -a gives me the following >>>>>>> > >>>>>>> > core file size (blocks, -c) 0 >>>>>>> > data seg size (kbytes, -d) unlimited >>>>>>> > scheduling priority (-e) 0 >>>>>>> > file size (blocks, -f) unlimited >>>>>>> > pending signals (-i) 120242 >>>>>>> > max locked memory (kbytes, -l) 64 >>>>>>> > max memory size (kbytes, -m) unlimited >>>>>>> > open files (-n) 1024 >>>>>>> > pipe size (512 bytes, -p) 8 >>>>>>> > POSIX message queues (bytes, -q) 819200 >>>>>>> > real-time priority (-r) 0 >>>>>>> > stack size (kbytes, -s) 8192 >>>>>>> > cpu time (seconds, -t) unlimited >>>>>>> > max user processes (-u) 120242 >>>>>>> > virtual memory (kbytes, -v) unlimited >>>>>>> > file locks (-x) unlimited >>>>>>> > >>>>>>> > So at this point I do understand that the I am running out of >>>>>>> memory due to >>>>>>> > allocation of threads so my biggest question is how do I tell my >>>>>>> spark >>>>>>> > driver program to not create so many? >>>>>>> > >>>>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> ps -L [pid] is what shows threads. I am not sure this is counting >>>>>>> what you >>>>>>> >> think it does. My shell process has about a hundred threads, and >>>>>>> I can't >>>>>>> >> imagine why one would have thousands unless your app spawned them. >>>>>>> >> >>>>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>> >>>>>>> >>> when I do >>>>>>> >>> >>>>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l >>>>>>> >>> >>>>>>> >>> The result is around 32K. why does it create so many threads how >>>>>>> can I >>>>>>> >>> limit this? >>>>>>> > >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >