Here is a UI of my thread dump. http://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMTYvMTEvMS8tLWpzdG Fja19kdW1wX3dpbmRvd19pbnRlcnZhbF8xbWluX2JhdGNoX2ludGVydmFsXz FzLnR4dC0tNi0xNy00Ng==
On Mon, Oct 31, 2016 at 7:10 PM, kant kodali <kanth...@gmail.com> wrote: > Hi Ryan, > > I think you are right. This may not be related to the Receiver. I have > attached jstack dump here. I do a simple MapToPair and reduceByKey and I > have a window Interval of 1 minute (60000ms) and batch interval of 1s ( > 1000) This is generating lot of threads atleast 5 to 8 threads per second > and the total number of threads is monotonically increasing. So just for > tweaking purpose I changed my window interval to 1min (60000ms) and batch > interval of 10s (10000) this looked lot better but still not ideal at > very least it is not monotonic anymore (It goes up and down). Now my > question really is how do I tune such that my number of threads are > optimal while satisfying the window Interval of 1 minute (60000ms) and > batch interval of 1s (1000) ? > > This jstack dump is taken after running my spark driver program for 2 mins > and there are about 1000 threads. > > Thanks! > > > On Mon, Oct 31, 2016 at 1:09 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> If there is some leaking threads, I think you should be able to see the >> number of threads is increasing. You can just dump threads after 1-2 hours. >> >> On Mon, Oct 31, 2016 at 12:59 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> yes I can certainly use jstack but it requires 4 to 5 hours for me to >>> reproduce the error so I can get back as early as possible. >>> >>> Thanks a lot! >>> >>> On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Then it should not be a Receiver issue. Could you use `jstack` to find >>>> out the name of leaking threads? >>>> >>>> On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ryan, >>>>> >>>>> It happens on the driver side and I am running on a client mode (not >>>>> the cluster mode). >>>>> >>>>> Thanks! >>>>> >>>>> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu < >>>>> shixi...@databricks.com> wrote: >>>>> >>>>>> Sorry, there is a typo in my previous email: this may **not** be the >>>>>> root cause if the leak threads are in the driver side. >>>>>> >>>>>> Does it happen in the driver or executors? >>>>>> >>>>>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Ryan, >>>>>>> >>>>>>> Ahh My Receiver.onStop method is currently empty. >>>>>>> >>>>>>> 1) I have a hard time seeing why the receiver would crash so many times >>>>>>> within a span of 4 to 5 hours but anyways I understand I should still >>>>>>> cleanup during OnStop. >>>>>>> >>>>>>> 2) How do I clean up those threads? The documentation here >>>>>>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't >>>>>>> seem to have any method where I can clean up the threads created during >>>>>>> OnStart. any ideas? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu < >>>>>>> shixi...@databricks.com> wrote: >>>>>>> >>>>>>>> So in your code, each Receiver will start a new thread. Did you >>>>>>>> stop the receiver properly in `Receiver.onStop`? Otherwise, you may >>>>>>>> leak >>>>>>>> threads after a receiver crashes and is restarted by Spark. However, >>>>>>>> this >>>>>>>> may be the root cause since the leak threads are in the driver side. >>>>>>>> Could >>>>>>>> you use `jstack` to check which types of threads are leaking? >>>>>>>> >>>>>>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I am also under the assumption that *onStart *function of the >>>>>>>>> Receiver is only called only once by Spark. please correct me if >>>>>>>>> I am wrong. >>>>>>>>> >>>>>>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> My driver program runs a spark streaming job. And it spawns a >>>>>>>>>> thread by itself only in the *onStart()* function below Other >>>>>>>>>> than that it doesn't spawn any other threads. It only calls >>>>>>>>>> MapToPair, >>>>>>>>>> ReduceByKey, forEachRDD, Collect functions. >>>>>>>>>> >>>>>>>>>> public class NSQReceiver extends Receiver<String> { >>>>>>>>>> >>>>>>>>>> private String topic=""; >>>>>>>>>> >>>>>>>>>> public NSQReceiver(String topic) { >>>>>>>>>> super(StorageLevel.MEMORY_AND_DISK_2()); >>>>>>>>>> this.topic = topic; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void *onStart()* { >>>>>>>>>> new Thread() { >>>>>>>>>> @Override public void run() { >>>>>>>>>> receive(); >>>>>>>>>> } >>>>>>>>>> }.start(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Environment info: >>>>>>>>>> >>>>>>>>>> Java 8 >>>>>>>>>> >>>>>>>>>> Scala 2.11.8 >>>>>>>>>> >>>>>>>>>> Spark 2.0.0 >>>>>>>>>> >>>>>>>>>> More than happy to share any other info you may need. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky < >>>>>>>>>> ja...@odersky.com> wrote: >>>>>>>>>> >>>>>>>>>>> > how do I tell my spark driver program to not create so many? >>>>>>>>>>> >>>>>>>>>>> This may depend on your driver program. Do you spawn any threads >>>>>>>>>>> in >>>>>>>>>>> it? Could you share some more information on the driver program, >>>>>>>>>>> spark >>>>>>>>>>> version and your environment? It would greatly help others to >>>>>>>>>>> help you >>>>>>>>>>> >>>>>>>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> > The source of my problem is actually that I am running into >>>>>>>>>>> the following >>>>>>>>>>> > error. This error seems to happen after running my driver >>>>>>>>>>> program for 4 >>>>>>>>>>> > hours. >>>>>>>>>>> > >>>>>>>>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in >>>>>>>>>>> thread >>>>>>>>>>> > "dag-scheduler-event-loop" Exception in thread >>>>>>>>>>> "ForkJoinPool-50-worker-13" >>>>>>>>>>> > java.lang.OutOfMemoryError: unable to create new native thread" >>>>>>>>>>> > >>>>>>>>>>> > and this wonderful book taught me that the error "unable to >>>>>>>>>>> create new >>>>>>>>>>> > native thread" can happen because JVM is trying to request the >>>>>>>>>>> OS for a >>>>>>>>>>> > thread and it is refusing to do so for the following reasons >>>>>>>>>>> > >>>>>>>>>>> > 1. The system has actually run out of virtual memory. >>>>>>>>>>> > 2. On Unix-style systems, the user has already created >>>>>>>>>>> (between all programs >>>>>>>>>>> > user is running) the maximum number of processes configured >>>>>>>>>>> for that user >>>>>>>>>>> > login. Individual threads are considered a process in that >>>>>>>>>>> regard. >>>>>>>>>>> > >>>>>>>>>>> > Option #2 is ruled out in my case because my driver programing >>>>>>>>>>> is running >>>>>>>>>>> > with a userid of root which has maximum number of processes >>>>>>>>>>> set to 120242 >>>>>>>>>>> > >>>>>>>>>>> > ulimit -a gives me the following >>>>>>>>>>> > >>>>>>>>>>> > core file size (blocks, -c) 0 >>>>>>>>>>> > data seg size (kbytes, -d) unlimited >>>>>>>>>>> > scheduling priority (-e) 0 >>>>>>>>>>> > file size (blocks, -f) unlimited >>>>>>>>>>> > pending signals (-i) 120242 >>>>>>>>>>> > max locked memory (kbytes, -l) 64 >>>>>>>>>>> > max memory size (kbytes, -m) unlimited >>>>>>>>>>> > open files (-n) 1024 >>>>>>>>>>> > pipe size (512 bytes, -p) 8 >>>>>>>>>>> > POSIX message queues (bytes, -q) 819200 >>>>>>>>>>> > real-time priority (-r) 0 >>>>>>>>>>> > stack size (kbytes, -s) 8192 >>>>>>>>>>> > cpu time (seconds, -t) unlimited >>>>>>>>>>> > max user processes (-u) 120242 >>>>>>>>>>> > virtual memory (kbytes, -v) unlimited >>>>>>>>>>> > file locks (-x) unlimited >>>>>>>>>>> > >>>>>>>>>>> > So at this point I do understand that the I am running out of >>>>>>>>>>> memory due to >>>>>>>>>>> > allocation of threads so my biggest question is how do I tell >>>>>>>>>>> my spark >>>>>>>>>>> > driver program to not create so many? >>>>>>>>>>> > >>>>>>>>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> ps -L [pid] is what shows threads. I am not sure this is >>>>>>>>>>> counting what you >>>>>>>>>>> >> think it does. My shell process has about a hundred threads, >>>>>>>>>>> and I can't >>>>>>>>>>> >> imagine why one would have thousands unless your app spawned >>>>>>>>>>> them. >>>>>>>>>>> >> >>>>>>>>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali < >>>>>>>>>>> kanth...@gmail.com> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> when I do >>>>>>>>>>> >>> >>>>>>>>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l >>>>>>>>>>> >>> >>>>>>>>>>> >>> The result is around 32K. why does it create so many threads >>>>>>>>>>> how can I >>>>>>>>>>> >>> limit this? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >