Hi Ryan,

It happens on the driver side and I am running on a client mode (not the
cluster mode).

Thanks!

On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Sorry, there is a typo in my previous email: this may **not** be the root
> cause if the leak threads are in the driver side.
>
> Does it happen in the driver or executors?
>
> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> Ahh My Receiver.onStop method is currently empty.
>>
>> 1) I have a hard time seeing why the receiver would crash so many times 
>> within a span of 4 to 5 hours but anyways I understand I should still 
>> cleanup during OnStop.
>>
>> 2) How do I clean up those threads? The documentation here 
>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem 
>> to have any method where I can clean up the threads created during OnStart. 
>> any ideas?
>>
>> Thanks!
>>
>>
>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> So in your code, each Receiver will start a new thread. Did you stop the
>>> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
>>> after a receiver crashes and is restarted by Spark. However, this may be
>>> the root cause since the leak threads are in the driver side. Could you use
>>> `jstack` to check which types of threads are leaking?
>>>
>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> I am also under the assumption that *onStart *function of the Receiver is
>>>> only called only once by Spark. please correct me if I am wrong.
>>>>
>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> My driver program runs a spark streaming job.  And it spawns a thread
>>>>> by itself only in the *onStart()* function below Other than that it
>>>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>>>>> forEachRDD, Collect functions.
>>>>>
>>>>> public class NSQReceiver extends Receiver<String> {
>>>>>
>>>>>     private String topic="";
>>>>>
>>>>>     public NSQReceiver(String topic) {
>>>>>         super(StorageLevel.MEMORY_AND_DISK_2());
>>>>>         this.topic = topic;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void *onStart()* {
>>>>>         new Thread()  {
>>>>>             @Override public void run() {
>>>>>                 receive();
>>>>>             }
>>>>>         }.start();
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Environment info:
>>>>>
>>>>> Java 8
>>>>>
>>>>> Scala 2.11.8
>>>>>
>>>>> Spark 2.0.0
>>>>>
>>>>> More than happy to share any other info you may need.
>>>>>
>>>>>
>>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com>
>>>>> wrote:
>>>>>
>>>>>>  > how do I tell my spark driver program to not create so many?
>>>>>>
>>>>>> This may depend on your driver program. Do you spawn any threads in
>>>>>> it? Could you share some more information on the driver program, spark
>>>>>> version and your environment? It would greatly help others to help you
>>>>>>
>>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>> > The source of my problem is actually that I am running into the
>>>>>> following
>>>>>> > error. This error seems to happen after running my driver program
>>>>>> for 4
>>>>>> > hours.
>>>>>> >
>>>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>>>>>> > "dag-scheduler-event-loop" Exception in thread
>>>>>> "ForkJoinPool-50-worker-13"
>>>>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>>>>> >
>>>>>> > and this wonderful book taught me that the error "unable to create
>>>>>> new
>>>>>> > native thread" can happen because JVM is trying to request the OS
>>>>>> for a
>>>>>> > thread and it is refusing to do so for the following reasons
>>>>>> >
>>>>>> > 1. The system has actually run out of virtual memory.
>>>>>> > 2. On Unix-style systems, the user has already created (between all
>>>>>> programs
>>>>>> > user is running) the maximum number of processes configured for
>>>>>> that user
>>>>>> > login. Individual threads are considered a process in that regard.
>>>>>> >
>>>>>> > Option #2 is ruled out in my case because my driver programing is
>>>>>> running
>>>>>> > with a userid of root which has  maximum number of processes set to
>>>>>> 120242
>>>>>> >
>>>>>> > ulimit -a gives me the following
>>>>>> >
>>>>>> > core file size          (blocks, -c) 0
>>>>>> > data seg size           (kbytes, -d) unlimited
>>>>>> > scheduling priority             (-e) 0
>>>>>> > file size               (blocks, -f) unlimited
>>>>>> > pending signals                 (-i) 120242
>>>>>> > max locked memory       (kbytes, -l) 64
>>>>>> > max memory size         (kbytes, -m) unlimited
>>>>>> > open files                      (-n) 1024
>>>>>> > pipe size            (512 bytes, -p) 8
>>>>>> > POSIX message queues     (bytes, -q) 819200
>>>>>> > real-time priority              (-r) 0
>>>>>> > stack size              (kbytes, -s) 8192
>>>>>> > cpu time               (seconds, -t) unlimited
>>>>>> > max user processes              (-u) 120242
>>>>>> > virtual memory          (kbytes, -v) unlimited
>>>>>> > file locks                      (-x) unlimited
>>>>>> >
>>>>>> > So at this point I do understand that the I am running out of
>>>>>> memory due to
>>>>>> > allocation of threads so my biggest question is how do I tell my
>>>>>> spark
>>>>>> > driver program to not create so many?
>>>>>> >
>>>>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> ps -L [pid] is what shows threads. I am not sure this is counting
>>>>>> what you
>>>>>> >> think it does. My shell process has about a hundred threads, and I
>>>>>> can't
>>>>>> >> imagine why one would have thousands unless your app spawned them.
>>>>>> >>
>>>>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> when I do
>>>>>> >>>
>>>>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>>>>> >>>
>>>>>> >>> The result is around 32K. why does it create so many threads how
>>>>>> can I
>>>>>> >>> limit this?
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to