Hi,

I wrote a simple test job, it only does very basic operations. for example:

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
1)).map(_._2)
    val logs = lines.flatMap { line =>
      try {
        Some(parse(line).extract[Impression])
      } catch {
        case _: Exception => None
      }
    }

    logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        iter.foreach(count => logger.info(count.toString))
      }
    }

It receives messages from Kafka, parse the json, filter and count the
records, and then print it to logs.

Thanks.


On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Hi Zhang,
>
> Could you paste your code in a gist? Not sure what you are doing inside
> the code to fill up memory.
>
> Thanks
> Best Regards
>
> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote:
>
>> Hi,
>>
>> Yes, I'm using createStream, but the storageLevel param is by default
>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>> don't think Kafka messages will be cached in driver.
>>
>>
>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Are you using the createStream or createDirectStream api? If its the
>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
>>> slow things down though). Another way would be to try the later one.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>
>>>> Hi Akhil,
>>>>
>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>>>> maybe it is, but how to tell?
>>>>
>>>> I googled about how to check the off-heap memory usage, there's a tool
>>>> called pmap, but I don't know how to interprete the results.
>>>>
>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> After submitting the job, if you do a ps aux | grep spark-submit then
>>>>> you can see all JVM params. Are you using the highlevel consumer (receiver
>>>>> based) for receiving data from Kafka? In that case if your throughput is
>>>>> high and the processing delay exceeds batch interval then you will hit 
>>>>> this
>>>>> memory issues as the data will keep on receiving and is dumped to memory.
>>>>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
>>>>> Another alternate will be to use the lowlevel kafka consumer
>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use the
>>>>> non-receiver based directStream
>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>> that comes up with spark.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>>>>>> find out that YARN is killing the driver and executor process because of
>>>>>> excessive use of memory. Here's something I tried:
>>>>>>
>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
>>>>>> extra memory is not used by heap.
>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but
>>>>>> the memory just keeps growing and then hits the limit.
>>>>>> 3. This problem is not shown in low-throughput jobs, neither in
>>>>>> standalone mode.
>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>> interval of 1, do some filtering and aggregation, and then print to
>>>>>> executor logs. So it's not some 3rd party library that causes the 'leak'.
>>>>>>
>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>
>>>>>> Any ideas will be appreciated.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry

Reply via email to