Hi,

Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

 num     #instances         #bytes  class name
----------------------------------------------
   1:         40802      145083848  [B
   2:         99264       12716112  <methodKlass>
   3:         99264       12291480  <constMethodKlass>
   4:          8472        9144816  <constantPoolKlass>
   5:          8472        7625192  <instanceKlassKlass>
   6:           186        6097824
 [Lscala.concurrent.forkjoin.ForkJoinTask;
   7:          7045        4804832  <constantPoolCacheKlass>
   8:        139168        4453376  java.util.HashMap$Entry
   9:          9427        3542512  <methodDataKlass>
  10:        141312        3391488
 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  11:        135491        3251784  java.lang.Long
  12:         26192        2765496  [C
  13:           813        1140560  [Ljava.util.HashMap$Entry;
  14:          8997        1061936  java.lang.Class
  15:         16022         851384  [[I
  16:         16447         789456  java.util.zip.Inflater
  17:         13855         723376  [S
  18:         17282         691280  java.lang.ref.Finalizer
  19:         25725         617400  java.lang.String
  20:           320         570368
 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  21:         16066         514112
 java.util.concurrent.ConcurrentHashMap$HashEntry
  22:         12288         491520
 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
  23:         13343         426976
 java.util.concurrent.locks.ReentrantLock$NonfairSync
  24:         12288         396416
 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
  25:         16447         394728  java.util.zip.ZStreamRef
  26:           565         370080  [I
  27:           508         272288  <objArrayKlassKlass>
  28:         16233         259728  java.lang.Object
  29:           771         209232
 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
  30:          2524         192312  [Ljava.lang.Object;

But as I mentioned above, the heap memory seems OK, the extra memory is
consumed by some off-heap data. I can't find a way to figure out what is in
there.

Besides, I did some extra experiments, i.e. run the same program in
difference environments to test whether it has off-heap memory issue:

spark1.0 + standalone = no
spark1.0 + yarn = no
spark1.3 + standalone = no
spark1.3 + yarn = yes

I'm using CDH5.1, so the spark1.0 is provided by cdh, and
spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

I could use spark1.0 + yarn, but I can't find a way to handle the logs,
level and rolling, so it'll explode the harddrive.

Currently I'll stick to spark1.0 + standalone, until our ops team decides
to upgrade cdh.



On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> wrote:

> While you are running is it possible for you login into the YARN node and
> get histograms of live objects using "jmap -histo:live". That may reveal
> something.
>
>
> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>
>> Hi,
>>
>> Unfortunately, they're still growing, both driver and executors.
>>
>> I run the same job with local mode, everything is fine.
>>
>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Can you replace your counting part with this?
>>>
>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I wrote a simple test job, it only does very basic operations. for
>>>> example:
>>>>
>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
>>>> -> 1)).map(_._2)
>>>>     val logs = lines.flatMap { line =>
>>>>       try {
>>>>         Some(parse(line).extract[Impression])
>>>>       } catch {
>>>>         case _: Exception => None
>>>>       }
>>>>     }
>>>>
>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>       rdd.foreachPartition { iter =>
>>>>         iter.foreach(count => logger.info(count.toString))
>>>>       }
>>>>     }
>>>>
>>>> It receives messages from Kafka, parse the json, filter and count the
>>>> records, and then print it to logs.
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Hi Zhang,
>>>>>
>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>> inside the code to fill up memory.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Yes, I'm using createStream, but the storageLevel param is by default
>>>>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>>>>>> don't think Kafka messages will be cached in driver.
>>>>>>
>>>>>>
>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Are you using the createStream or createDirectStream api? If its the
>>>>>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it 
>>>>>>> might
>>>>>>> slow things down though). Another way would be to try the later one.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Akhil,
>>>>>>>>
>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>>>>>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>>>>>>>> maybe it is, but how to tell?
>>>>>>>>
>>>>>>>> I googled about how to check the off-heap memory usage, there's a
>>>>>>>> tool called pmap, but I don't know how to interprete the results.
>>>>>>>>
>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit
>>>>>>>>> then you can see all JVM params. Are you using the highlevel consumer
>>>>>>>>> (receiver based) for receiving data from Kafka? In that case if your
>>>>>>>>> throughput is high and the processing delay exceeds batch interval 
>>>>>>>>> then you
>>>>>>>>> will hit this memory issues as the data will keep on receiving and is
>>>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it 
>>>>>>>>> slows
>>>>>>>>> things down). Another alternate will be to use the lowlevel kafka
>>>>>>>>> consumer <https://github.com/dibbhatt/kafka-spark-consumer> or to
>>>>>>>>> use the non-receiver based directStream
>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>> that comes up with spark.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>>>>>>>>>> find out that YARN is killing the driver and executor process 
>>>>>>>>>> because of
>>>>>>>>>> excessive use of memory. Here's something I tried:
>>>>>>>>>>
>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
>>>>>>>>>> the extra memory is not used by heap.
>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384),
>>>>>>>>>> but the memory just keeps growing and then hits the limit.
>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in
>>>>>>>>>> standalone mode.
>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print to
>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the 
>>>>>>>>>> 'leak'.
>>>>>>>>>>
>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>
>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jerry
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>>
>>
>>
>> --
>> Jerry
>>
>


-- 
Jerry

Reply via email to