Hi, We switched from ParallelGC to CMS, and the symptom is gone.
On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > Hi, > > I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this > setting can be seen in web ui's environment tab. But, it still eats memory, > i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. > > > On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote: > >> Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off >> the off-heap allocation of netty? >> >> Best Regards, >> Shixiong Zhu >> >> 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>: >> >>> Hi, >>> >>> Thanks for you information. I'll give spark1.4 a try when it's released. >>> >>> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Could you try it out with Spark 1.4 RC3? >>>> >>>> Also pinging, Cloudera folks, they may be aware of something. >>>> >>>> BTW, the way I have debugged memory leaks in the past is as follows. >>>> >>>> Run with a small driver memory, say 1 GB. Periodically (maybe a >>>> script), take snapshots of histogram and also do memory dumps. Say every >>>> hour. And then compare the difference between two histo/dumps that are few >>>> hours separated (more the better). Diffing histo is easy. Diff two dumps >>>> can be done in JVisualVM, it will show the diff in the objects that got >>>> added in the later dump. That makes it easy to debug what is not getting >>>> cleaned. >>>> >>>> TD >>>> >>>> >>>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live >>>>> result: >>>>> >>>>> num #instances #bytes class name >>>>> ---------------------------------------------- >>>>> 1: 40802 145083848 [B >>>>> 2: 99264 12716112 <methodKlass> >>>>> 3: 99264 12291480 <constMethodKlass> >>>>> 4: 8472 9144816 <constantPoolKlass> >>>>> 5: 8472 7625192 <instanceKlassKlass> >>>>> 6: 186 6097824 >>>>> [Lscala.concurrent.forkjoin.ForkJoinTask; >>>>> 7: 7045 4804832 <constantPoolCacheKlass> >>>>> 8: 139168 4453376 java.util.HashMap$Entry >>>>> 9: 9427 3542512 <methodDataKlass> >>>>> 10: 141312 3391488 >>>>> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >>>>> 11: 135491 3251784 java.lang.Long >>>>> 12: 26192 2765496 [C >>>>> 13: 813 1140560 [Ljava.util.HashMap$Entry; >>>>> 14: 8997 1061936 java.lang.Class >>>>> 15: 16022 851384 [[I >>>>> 16: 16447 789456 java.util.zip.Inflater >>>>> 17: 13855 723376 [S >>>>> 18: 17282 691280 java.lang.ref.Finalizer >>>>> 19: 25725 617400 java.lang.String >>>>> 20: 320 570368 >>>>> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; >>>>> 21: 16066 514112 >>>>> java.util.concurrent.ConcurrentHashMap$HashEntry >>>>> 22: 12288 491520 >>>>> org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment >>>>> 23: 13343 426976 >>>>> java.util.concurrent.locks.ReentrantLock$NonfairSync >>>>> 24: 12288 396416 >>>>> [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; >>>>> 25: 16447 394728 java.util.zip.ZStreamRef >>>>> 26: 565 370080 [I >>>>> 27: 508 272288 <objArrayKlassKlass> >>>>> 28: 16233 259728 java.lang.Object >>>>> 29: 771 209232 >>>>> [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; >>>>> 30: 2524 192312 [Ljava.lang.Object; >>>>> >>>>> But as I mentioned above, the heap memory seems OK, the extra memory >>>>> is consumed by some off-heap data. I can't find a way to figure out what >>>>> is >>>>> in there. >>>>> >>>>> Besides, I did some extra experiments, i.e. run the same program in >>>>> difference environments to test whether it has off-heap memory issue: >>>>> >>>>> spark1.0 + standalone = no >>>>> spark1.0 + yarn = no >>>>> spark1.3 + standalone = no >>>>> spark1.3 + yarn = yes >>>>> >>>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and >>>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. >>>>> >>>>> I could use spark1.0 + yarn, but I can't find a way to handle the >>>>> logs, level and rolling, so it'll explode the harddrive. >>>>> >>>>> Currently I'll stick to spark1.0 + standalone, until our ops team >>>>> decides to upgrade cdh. >>>>> >>>>> >>>>> >>>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> While you are running is it possible for you login into the YARN node >>>>>> and get histograms of live objects using "jmap -histo:live". That may >>>>>> reveal something. >>>>>> >>>>>> >>>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Unfortunately, they're still growing, both driver and executors. >>>>>>> >>>>>>> I run the same job with local mode, everything is fine. >>>>>>> >>>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Can you replace your counting part with this? >>>>>>>> >>>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Best Regards >>>>>>>> >>>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I wrote a simple test job, it only does very basic operations. for >>>>>>>>> example: >>>>>>>>> >>>>>>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>>>>> Map(topic -> 1)).map(_._2) >>>>>>>>> val logs = lines.flatMap { line => >>>>>>>>> try { >>>>>>>>> Some(parse(line).extract[Impression]) >>>>>>>>> } catch { >>>>>>>>> case _: Exception => None >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>>>>>> rdd.foreachPartition { iter => >>>>>>>>> iter.foreach(count => logger.info(count.toString)) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> It receives messages from Kafka, parse the json, filter and count >>>>>>>>> the records, and then print it to logs. >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das < >>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Zhang, >>>>>>>>>> >>>>>>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>>>>>> inside the code to fill up memory. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Are you using the createStream or createDirectStream api? If >>>>>>>>>>>> its the former, you can try setting the StorageLevel to >>>>>>>>>>>> MEMORY_AND_DISK (it >>>>>>>>>>>> might slow things down though). Another way would be to try the >>>>>>>>>>>> later one. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Akhil, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web >>>>>>>>>>>>> UI, the Processing Time is around 400ms, and there's no >>>>>>>>>>>>> Scheduling Delay, >>>>>>>>>>>>> so I suppose it's not the Kafka messages that eat up the off-heap >>>>>>>>>>>>> memory. >>>>>>>>>>>>> Or maybe it is, but how to tell? >>>>>>>>>>>>> >>>>>>>>>>>>> I googled about how to check the off-heap memory usage, >>>>>>>>>>>>> there's a tool called pmap, but I don't know how to interprete >>>>>>>>>>>>> the results. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep >>>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the >>>>>>>>>>>>>> highlevel >>>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that >>>>>>>>>>>>>> case if >>>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch >>>>>>>>>>>>>> interval >>>>>>>>>>>>>> then you will hit this memory issues as the data will keep on >>>>>>>>>>>>>> receiving and >>>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK >>>>>>>>>>>>>> (but it >>>>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel >>>>>>>>>>>>>> kafka consumer >>>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use >>>>>>>>>>>>>> the non-receiver based directStream >>>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>>>>>> that comes up with spark. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG < >>>>>>>>>>>>>> zhangj...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster >>>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor >>>>>>>>>>>>>>> process >>>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per >>>>>>>>>>>>>>> 10s), so the extra memory is not used by heap. >>>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is >>>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit. >>>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither >>>>>>>>>>>>>>> in standalone mode. >>>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with >>>>>>>>>>>>>>> batch interval of 1, do some filtering and aggregation, and >>>>>>>>>>>>>>> then print to >>>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes >>>>>>>>>>>>>>> the 'leak'. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Jerry >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Jerry >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Jerry >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jerry >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jerry >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Jerry >>>>> >>>> >>>> >>> >>> >>> -- >>> Jerry >>> >> >> > > > -- > Jerry > -- Jerry