Hi,

I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
setting can be seen in web ui's environment tab. But, it still eats memory,
i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:

> Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off the
> off-heap allocation of netty?
>
> Best Regards,
> Shixiong Zhu
>
> 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>:
>
>> Hi,
>>
>> Thanks for you information. I'll give spark1.4 a try when it's released.
>>
>> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Could you try it out with Spark 1.4 RC3?
>>>
>>> Also pinging, Cloudera folks, they may be aware of something.
>>>
>>> BTW, the way I have debugged memory leaks in the past is as follows.
>>>
>>> Run with a small driver memory, say 1 GB. Periodically (maybe a script),
>>> take snapshots of histogram and also do memory dumps. Say every hour. And
>>> then compare the difference between two histo/dumps that are few hours
>>> separated (more the better). Diffing histo is easy. Diff two dumps can be
>>> done in JVisualVM, it will show the diff in the objects that got added in
>>> the later dump. That makes it easy to debug what is not getting cleaned.
>>>
>>> TD
>>>
>>>
>>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live
>>>> result:
>>>>
>>>>  num     #instances         #bytes  class name
>>>> ----------------------------------------------
>>>>    1:         40802      145083848  [B
>>>>    2:         99264       12716112  <methodKlass>
>>>>    3:         99264       12291480  <constMethodKlass>
>>>>    4:          8472        9144816  <constantPoolKlass>
>>>>    5:          8472        7625192  <instanceKlassKlass>
>>>>    6:           186        6097824
>>>>  [Lscala.concurrent.forkjoin.ForkJoinTask;
>>>>    7:          7045        4804832  <constantPoolCacheKlass>
>>>>    8:        139168        4453376  java.util.HashMap$Entry
>>>>    9:          9427        3542512  <methodDataKlass>
>>>>   10:        141312        3391488
>>>>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>>   11:        135491        3251784  java.lang.Long
>>>>   12:         26192        2765496  [C
>>>>   13:           813        1140560  [Ljava.util.HashMap$Entry;
>>>>   14:          8997        1061936  java.lang.Class
>>>>   15:         16022         851384  [[I
>>>>   16:         16447         789456  java.util.zip.Inflater
>>>>   17:         13855         723376  [S
>>>>   18:         17282         691280  java.lang.ref.Finalizer
>>>>   19:         25725         617400  java.lang.String
>>>>   20:           320         570368
>>>>  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>>>>   21:         16066         514112
>>>>  java.util.concurrent.ConcurrentHashMap$HashEntry
>>>>   22:         12288         491520
>>>>  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
>>>>   23:         13343         426976
>>>>  java.util.concurrent.locks.ReentrantLock$NonfairSync
>>>>   24:         12288         396416
>>>>  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
>>>>   25:         16447         394728  java.util.zip.ZStreamRef
>>>>   26:           565         370080  [I
>>>>   27:           508         272288  <objArrayKlassKlass>
>>>>   28:         16233         259728  java.lang.Object
>>>>   29:           771         209232
>>>>  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
>>>>   30:          2524         192312  [Ljava.lang.Object;
>>>>
>>>> But as I mentioned above, the heap memory seems OK, the extra memory is
>>>> consumed by some off-heap data. I can't find a way to figure out what is in
>>>> there.
>>>>
>>>> Besides, I did some extra experiments, i.e. run the same program in
>>>> difference environments to test whether it has off-heap memory issue:
>>>>
>>>> spark1.0 + standalone = no
>>>> spark1.0 + yarn = no
>>>> spark1.3 + standalone = no
>>>> spark1.3 + yarn = yes
>>>>
>>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and
>>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.
>>>>
>>>> I could use spark1.0 + yarn, but I can't find a way to handle the logs,
>>>> level and rolling, so it'll explode the harddrive.
>>>>
>>>> Currently I'll stick to spark1.0 + standalone, until our ops team
>>>> decides to upgrade cdh.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> While you are running is it possible for you login into the YARN node
>>>>> and get histograms of live objects using "jmap -histo:live". That may
>>>>> reveal something.
>>>>>
>>>>>
>>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Unfortunately, they're still growing, both driver and executors.
>>>>>>
>>>>>> I run the same job with local mode, everything is fine.
>>>>>>
>>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Can you replace your counting part with this?
>>>>>>>
>>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I wrote a simple test job, it only does very basic operations. for
>>>>>>>> example:
>>>>>>>>
>>>>>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>>>> Map(topic -> 1)).map(_._2)
>>>>>>>>     val logs = lines.flatMap { line =>
>>>>>>>>       try {
>>>>>>>>         Some(parse(line).extract[Impression])
>>>>>>>>       } catch {
>>>>>>>>         case _: Exception => None
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>>>>>       rdd.foreachPartition { iter =>
>>>>>>>>         iter.foreach(count => logger.info(count.toString))
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>
>>>>>>>> It receives messages from Kafka, parse the json, filter and count
>>>>>>>> the records, and then print it to logs.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Zhang,
>>>>>>>>>
>>>>>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>>>>>> inside the code to fill up memory.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by
>>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also
>>>>>>>>>> growing. I don't think Kafka messages will be cached in driver.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you using the createStream or createDirectStream api? If its
>>>>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK 
>>>>>>>>>>> (it
>>>>>>>>>>> might slow things down though). Another way would be to try the 
>>>>>>>>>>> later one.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Akhil,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI,
>>>>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling 
>>>>>>>>>>>> Delay, so I
>>>>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap 
>>>>>>>>>>>> memory. Or
>>>>>>>>>>>> maybe it is, but how to tell?
>>>>>>>>>>>>
>>>>>>>>>>>> I googled about how to check the off-heap memory usage, there's
>>>>>>>>>>>> a tool called pmap, but I don't know how to interprete the results.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep
>>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the 
>>>>>>>>>>>>> highlevel
>>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that 
>>>>>>>>>>>>> case if
>>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch 
>>>>>>>>>>>>> interval
>>>>>>>>>>>>> then you will hit this memory issues as the data will keep on 
>>>>>>>>>>>>> receiving and
>>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK 
>>>>>>>>>>>>> (but it
>>>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel
>>>>>>>>>>>>> kafka consumer
>>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use
>>>>>>>>>>>>> the non-receiver based directStream
>>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>>>>>> that comes up with spark.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <
>>>>>>>>>>>>> zhangj...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster
>>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor 
>>>>>>>>>>>>>> process
>>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per
>>>>>>>>>>>>>> 10s), so the extra memory is not used by heap.
>>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is
>>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit.
>>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither
>>>>>>>>>>>>>> in standalone mode.
>>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes 
>>>>>>>>>>>>>> the 'leak'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Jerry
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jerry
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry

Reply via email to