Hi,

We switched from ParallelGC to CMS, and the symptom is gone.

On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG <zhangj...@gmail.com> wrote:

> Hi,
>
> I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
> setting can be seen in web ui's environment tab. But, it still eats memory,
> i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.
>
>
> On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>
>> Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off
>> the off-heap allocation of netty?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>:
>>
>>> Hi,
>>>
>>> Thanks for you information. I'll give spark1.4 a try when it's released.
>>>
>>> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Could you try it out with Spark 1.4 RC3?
>>>>
>>>> Also pinging, Cloudera folks, they may be aware of something.
>>>>
>>>> BTW, the way I have debugged memory leaks in the past is as follows.
>>>>
>>>> Run with a small driver memory, say 1 GB. Periodically (maybe a
>>>> script), take snapshots of histogram and also do memory dumps. Say every
>>>> hour. And then compare the difference between two histo/dumps that are few
>>>> hours separated (more the better). Diffing histo is easy. Diff two dumps
>>>> can be done in JVisualVM, it will show the diff in the objects that got
>>>> added in the later dump. That makes it easy to debug what is not getting
>>>> cleaned.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live
>>>>> result:
>>>>>
>>>>>  num     #instances         #bytes  class name
>>>>> ----------------------------------------------
>>>>>    1:         40802      145083848  [B
>>>>>    2:         99264       12716112  <methodKlass>
>>>>>    3:         99264       12291480  <constMethodKlass>
>>>>>    4:          8472        9144816  <constantPoolKlass>
>>>>>    5:          8472        7625192  <instanceKlassKlass>
>>>>>    6:           186        6097824
>>>>>  [Lscala.concurrent.forkjoin.ForkJoinTask;
>>>>>    7:          7045        4804832  <constantPoolCacheKlass>
>>>>>    8:        139168        4453376  java.util.HashMap$Entry
>>>>>    9:          9427        3542512  <methodDataKlass>
>>>>>   10:        141312        3391488
>>>>>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>>>   11:        135491        3251784  java.lang.Long
>>>>>   12:         26192        2765496  [C
>>>>>   13:           813        1140560  [Ljava.util.HashMap$Entry;
>>>>>   14:          8997        1061936  java.lang.Class
>>>>>   15:         16022         851384  [[I
>>>>>   16:         16447         789456  java.util.zip.Inflater
>>>>>   17:         13855         723376  [S
>>>>>   18:         17282         691280  java.lang.ref.Finalizer
>>>>>   19:         25725         617400  java.lang.String
>>>>>   20:           320         570368
>>>>>  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>>>>>   21:         16066         514112
>>>>>  java.util.concurrent.ConcurrentHashMap$HashEntry
>>>>>   22:         12288         491520
>>>>>  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
>>>>>   23:         13343         426976
>>>>>  java.util.concurrent.locks.ReentrantLock$NonfairSync
>>>>>   24:         12288         396416
>>>>>  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
>>>>>   25:         16447         394728  java.util.zip.ZStreamRef
>>>>>   26:           565         370080  [I
>>>>>   27:           508         272288  <objArrayKlassKlass>
>>>>>   28:         16233         259728  java.lang.Object
>>>>>   29:           771         209232
>>>>>  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
>>>>>   30:          2524         192312  [Ljava.lang.Object;
>>>>>
>>>>> But as I mentioned above, the heap memory seems OK, the extra memory
>>>>> is consumed by some off-heap data. I can't find a way to figure out what 
>>>>> is
>>>>> in there.
>>>>>
>>>>> Besides, I did some extra experiments, i.e. run the same program in
>>>>> difference environments to test whether it has off-heap memory issue:
>>>>>
>>>>> spark1.0 + standalone = no
>>>>> spark1.0 + yarn = no
>>>>> spark1.3 + standalone = no
>>>>> spark1.3 + yarn = yes
>>>>>
>>>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and
>>>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.
>>>>>
>>>>> I could use spark1.0 + yarn, but I can't find a way to handle the
>>>>> logs, level and rolling, so it'll explode the harddrive.
>>>>>
>>>>> Currently I'll stick to spark1.0 + standalone, until our ops team
>>>>> decides to upgrade cdh.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> While you are running is it possible for you login into the YARN node
>>>>>> and get histograms of live objects using "jmap -histo:live". That may
>>>>>> reveal something.
>>>>>>
>>>>>>
>>>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Unfortunately, they're still growing, both driver and executors.
>>>>>>>
>>>>>>> I run the same job with local mode, everything is fine.
>>>>>>>
>>>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <
>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> Can you replace your counting part with this?
>>>>>>>>
>>>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I wrote a simple test job, it only does very basic operations. for
>>>>>>>>> example:
>>>>>>>>>
>>>>>>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>>>>> Map(topic -> 1)).map(_._2)
>>>>>>>>>     val logs = lines.flatMap { line =>
>>>>>>>>>       try {
>>>>>>>>>         Some(parse(line).extract[Impression])
>>>>>>>>>       } catch {
>>>>>>>>>         case _: Exception => None
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>>>>>>       rdd.foreachPartition { iter =>
>>>>>>>>>         iter.foreach(count => logger.info(count.toString))
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> It receives messages from Kafka, parse the json, filter and count
>>>>>>>>> the records, and then print it to logs.
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <
>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Zhang,
>>>>>>>>>>
>>>>>>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>>>>>>> inside the code to fill up memory.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by
>>>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also
>>>>>>>>>>> growing. I don't think Kafka messages will be cached in driver.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you using the createStream or createDirectStream api? If
>>>>>>>>>>>> its the former, you can try setting the StorageLevel to 
>>>>>>>>>>>> MEMORY_AND_DISK (it
>>>>>>>>>>>> might slow things down though). Another way would be to try the 
>>>>>>>>>>>> later one.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Akhil,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web
>>>>>>>>>>>>> UI, the Processing Time is around 400ms, and there's no 
>>>>>>>>>>>>> Scheduling Delay,
>>>>>>>>>>>>> so I suppose it's not the Kafka messages that eat up the off-heap 
>>>>>>>>>>>>> memory.
>>>>>>>>>>>>> Or maybe it is, but how to tell?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I googled about how to check the off-heap memory usage,
>>>>>>>>>>>>> there's a tool called pmap, but I don't know how to interprete 
>>>>>>>>>>>>> the results.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep
>>>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the 
>>>>>>>>>>>>>> highlevel
>>>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that 
>>>>>>>>>>>>>> case if
>>>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch 
>>>>>>>>>>>>>> interval
>>>>>>>>>>>>>> then you will hit this memory issues as the data will keep on 
>>>>>>>>>>>>>> receiving and
>>>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK 
>>>>>>>>>>>>>> (but it
>>>>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel
>>>>>>>>>>>>>> kafka consumer
>>>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use
>>>>>>>>>>>>>> the non-receiver based directStream
>>>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>>>>>>> that comes up with spark.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <
>>>>>>>>>>>>>> zhangj...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster
>>>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor 
>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per
>>>>>>>>>>>>>>> 10s), so the extra memory is not used by heap.
>>>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is
>>>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit.
>>>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither
>>>>>>>>>>>>>>> in standalone mode.
>>>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with
>>>>>>>>>>>>>>> batch interval of 1, do some filtering and aggregation, and 
>>>>>>>>>>>>>>> then print to
>>>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes 
>>>>>>>>>>>>>>> the 'leak'.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Jerry
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jerry
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jerry
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jerry
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>



-- 
Jerry

Reply via email to