Hi, I wrote a simple test job, it only does very basic operations. for example:
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -> 1)).map(_._2) val logs = lines.flatMap { line => try { Some(parse(line).extract[Impression]) } catch { case _: Exception => None } } logs.filter(_.s_id > 0).count.foreachRDD { rdd => rdd.foreachPartition { iter => iter.foreach(count => logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Hi Zhang, > > Could you paste your code in a gist? Not sure what you are doing inside > the code to fill up memory. > > Thanks > Best Regards > > On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote: > >> Hi, >> >> Yes, I'm using createStream, but the storageLevel param is by default >> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I >> don't think Kafka messages will be cached in driver. >> >> >> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Are you using the createStream or createDirectStream api? If its the >>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might >>> slow things down though). Another way would be to try the later one. >>> >>> Thanks >>> Best Regards >>> >>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>> >>>> Hi Akhil, >>>> >>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the >>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I >>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or >>>> maybe it is, but how to tell? >>>> >>>> I googled about how to check the off-heap memory usage, there's a tool >>>> called pmap, but I don't know how to interprete the results. >>>> >>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> After submitting the job, if you do a ps aux | grep spark-submit then >>>>> you can see all JVM params. Are you using the highlevel consumer (receiver >>>>> based) for receiving data from Kafka? In that case if your throughput is >>>>> high and the processing delay exceeds batch interval then you will hit >>>>> this >>>>> memory issues as the data will keep on receiving and is dumped to memory. >>>>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). >>>>> Another alternate will be to use the lowlevel kafka consumer >>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use the >>>>> non-receiver based directStream >>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>> that comes up with spark. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I >>>>>> find out that YARN is killing the driver and executor process because of >>>>>> excessive use of memory. Here's something I tried: >>>>>> >>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the >>>>>> extra memory is not used by heap. >>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but >>>>>> the memory just keeps growing and then hits the limit. >>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>> standalone mode. >>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>> executor logs. So it's not some 3rd party library that causes the 'leak'. >>>>>> >>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>> >>>>>> Any ideas will be appreciated. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >>> >> >> >> -- >> Jerry >> > > -- Jerry