Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.
On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote: > Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off the > off-heap allocation of netty? > > Best Regards, > Shixiong Zhu > > 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>: > >> Hi, >> >> Thanks for you information. I'll give spark1.4 a try when it's released. >> >> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Could you try it out with Spark 1.4 RC3? >>> >>> Also pinging, Cloudera folks, they may be aware of something. >>> >>> BTW, the way I have debugged memory leaks in the past is as follows. >>> >>> Run with a small driver memory, say 1 GB. Periodically (maybe a script), >>> take snapshots of histogram and also do memory dumps. Say every hour. And >>> then compare the difference between two histo/dumps that are few hours >>> separated (more the better). Diffing histo is easy. Diff two dumps can be >>> done in JVisualVM, it will show the diff in the objects that got added in >>> the later dump. That makes it easy to debug what is not getting cleaned. >>> >>> TD >>> >>> >>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live >>>> result: >>>> >>>> num #instances #bytes class name >>>> ---------------------------------------------- >>>> 1: 40802 145083848 [B >>>> 2: 99264 12716112 <methodKlass> >>>> 3: 99264 12291480 <constMethodKlass> >>>> 4: 8472 9144816 <constantPoolKlass> >>>> 5: 8472 7625192 <instanceKlassKlass> >>>> 6: 186 6097824 >>>> [Lscala.concurrent.forkjoin.ForkJoinTask; >>>> 7: 7045 4804832 <constantPoolCacheKlass> >>>> 8: 139168 4453376 java.util.HashMap$Entry >>>> 9: 9427 3542512 <methodDataKlass> >>>> 10: 141312 3391488 >>>> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >>>> 11: 135491 3251784 java.lang.Long >>>> 12: 26192 2765496 [C >>>> 13: 813 1140560 [Ljava.util.HashMap$Entry; >>>> 14: 8997 1061936 java.lang.Class >>>> 15: 16022 851384 [[I >>>> 16: 16447 789456 java.util.zip.Inflater >>>> 17: 13855 723376 [S >>>> 18: 17282 691280 java.lang.ref.Finalizer >>>> 19: 25725 617400 java.lang.String >>>> 20: 320 570368 >>>> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; >>>> 21: 16066 514112 >>>> java.util.concurrent.ConcurrentHashMap$HashEntry >>>> 22: 12288 491520 >>>> org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment >>>> 23: 13343 426976 >>>> java.util.concurrent.locks.ReentrantLock$NonfairSync >>>> 24: 12288 396416 >>>> [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; >>>> 25: 16447 394728 java.util.zip.ZStreamRef >>>> 26: 565 370080 [I >>>> 27: 508 272288 <objArrayKlassKlass> >>>> 28: 16233 259728 java.lang.Object >>>> 29: 771 209232 >>>> [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; >>>> 30: 2524 192312 [Ljava.lang.Object; >>>> >>>> But as I mentioned above, the heap memory seems OK, the extra memory is >>>> consumed by some off-heap data. I can't find a way to figure out what is in >>>> there. >>>> >>>> Besides, I did some extra experiments, i.e. run the same program in >>>> difference environments to test whether it has off-heap memory issue: >>>> >>>> spark1.0 + standalone = no >>>> spark1.0 + yarn = no >>>> spark1.3 + standalone = no >>>> spark1.3 + yarn = yes >>>> >>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and >>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. >>>> >>>> I could use spark1.0 + yarn, but I can't find a way to handle the logs, >>>> level and rolling, so it'll explode the harddrive. >>>> >>>> Currently I'll stick to spark1.0 + standalone, until our ops team >>>> decides to upgrade cdh. >>>> >>>> >>>> >>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> While you are running is it possible for you login into the YARN node >>>>> and get histograms of live objects using "jmap -histo:live". That may >>>>> reveal something. >>>>> >>>>> >>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Unfortunately, they're still growing, both driver and executors. >>>>>> >>>>>> I run the same job with local mode, everything is fine. >>>>>> >>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Can you replace your counting part with this? >>>>>>> >>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I wrote a simple test job, it only does very basic operations. for >>>>>>>> example: >>>>>>>> >>>>>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>>>> Map(topic -> 1)).map(_._2) >>>>>>>> val logs = lines.flatMap { line => >>>>>>>> try { >>>>>>>> Some(parse(line).extract[Impression]) >>>>>>>> } catch { >>>>>>>> case _: Exception => None >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>>>>> rdd.foreachPartition { iter => >>>>>>>> iter.foreach(count => logger.info(count.toString)) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> It receives messages from Kafka, parse the json, filter and count >>>>>>>> the records, and then print it to logs. >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Hi Zhang, >>>>>>>>> >>>>>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>>>>> inside the code to fill up memory. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>> >>>>>>>>>>> Are you using the createStream or createDirectStream api? If its >>>>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK >>>>>>>>>>> (it >>>>>>>>>>> might slow things down though). Another way would be to try the >>>>>>>>>>> later one. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Akhil, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, >>>>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling >>>>>>>>>>>> Delay, so I >>>>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap >>>>>>>>>>>> memory. Or >>>>>>>>>>>> maybe it is, but how to tell? >>>>>>>>>>>> >>>>>>>>>>>> I googled about how to check the off-heap memory usage, there's >>>>>>>>>>>> a tool called pmap, but I don't know how to interprete the results. >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep >>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the >>>>>>>>>>>>> highlevel >>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that >>>>>>>>>>>>> case if >>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch >>>>>>>>>>>>> interval >>>>>>>>>>>>> then you will hit this memory issues as the data will keep on >>>>>>>>>>>>> receiving and >>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK >>>>>>>>>>>>> (but it >>>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel >>>>>>>>>>>>> kafka consumer >>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use >>>>>>>>>>>>> the non-receiver based directStream >>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>>>>> that comes up with spark. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG < >>>>>>>>>>>>> zhangj...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster >>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor >>>>>>>>>>>>>> process >>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per >>>>>>>>>>>>>> 10s), so the extra memory is not used by heap. >>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is >>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit. >>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither >>>>>>>>>>>>>> in standalone mode. >>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print >>>>>>>>>>>>>> to >>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes >>>>>>>>>>>>>> the 'leak'. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Jerry >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Jerry >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jerry >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jerry >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >>> >> >> >> -- >> Jerry >> > > -- Jerry