Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:
num #instances #bytes class name ---------------------------------------------- 1: 40802 145083848 [B 2: 99264 12716112 <methodKlass> 3: 99264 12291480 <constMethodKlass> 4: 8472 9144816 <constantPoolKlass> 5: 8472 7625192 <instanceKlassKlass> 6: 186 6097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 7045 4804832 <constantPoolCacheKlass> 8: 139168 4453376 java.util.HashMap$Entry 9: 9427 3542512 <methodDataKlass> 10: 141312 3391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11: 135491 3251784 java.lang.Long 12: 26192 2765496 [C 13: 813 1140560 [Ljava.util.HashMap$Entry; 14: 8997 1061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 <objArrayKlassKlass> 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> wrote: > While you are running is it possible for you login into the YARN node and > get histograms of live objects using "jmap -histo:live". That may reveal > something. > > > On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: > >> Hi, >> >> Unfortunately, they're still growing, both driver and executors. >> >> I run the same job with local mode, everything is fine. >> >> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Can you replace your counting part with this? >>> >>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>> >>> >>> >>> Thanks >>> Best Regards >>> >>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I wrote a simple test job, it only does very basic operations. for >>>> example: >>>> >>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic >>>> -> 1)).map(_._2) >>>> val logs = lines.flatMap { line => >>>> try { >>>> Some(parse(line).extract[Impression]) >>>> } catch { >>>> case _: Exception => None >>>> } >>>> } >>>> >>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>> rdd.foreachPartition { iter => >>>> iter.foreach(count => logger.info(count.toString)) >>>> } >>>> } >>>> >>>> It receives messages from Kafka, parse the json, filter and count the >>>> records, and then print it to logs. >>>> >>>> Thanks. >>>> >>>> >>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Hi Zhang, >>>>> >>>>> Could you paste your code in a gist? Not sure what you are doing >>>>> inside the code to fill up memory. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Yes, I'm using createStream, but the storageLevel param is by default >>>>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I >>>>>> don't think Kafka messages will be cached in driver. >>>>>> >>>>>> >>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Are you using the createStream or createDirectStream api? If its the >>>>>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it >>>>>>> might >>>>>>> slow things down though). Another way would be to try the later one. >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Akhil, >>>>>>>> >>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the >>>>>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I >>>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or >>>>>>>> maybe it is, but how to tell? >>>>>>>> >>>>>>>> I googled about how to check the off-heap memory usage, there's a >>>>>>>> tool called pmap, but I don't know how to interprete the results. >>>>>>>> >>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit >>>>>>>>> then you can see all JVM params. Are you using the highlevel consumer >>>>>>>>> (receiver based) for receiving data from Kafka? In that case if your >>>>>>>>> throughput is high and the processing delay exceeds batch interval >>>>>>>>> then you >>>>>>>>> will hit this memory issues as the data will keep on receiving and is >>>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it >>>>>>>>> slows >>>>>>>>> things down). Another alternate will be to use the lowlevel kafka >>>>>>>>> consumer <https://github.com/dibbhatt/kafka-spark-consumer> or to >>>>>>>>> use the non-receiver based directStream >>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>> that comes up with spark. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I >>>>>>>>>> find out that YARN is killing the driver and executor process >>>>>>>>>> because of >>>>>>>>>> excessive use of memory. Here's something I tried: >>>>>>>>>> >>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so >>>>>>>>>> the extra memory is not used by heap. >>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), >>>>>>>>>> but the memory just keeps growing and then hits the limit. >>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>>>>>> standalone mode. >>>>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>>>>> 'leak'. >>>>>>>>>> >>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>> >>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jerry >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jerry >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >>> >> >> >> -- >> Jerry >> > -- Jerry