I have had similar issues with some of my spark jobs especially doing things like repartitioning.

spark.yarn.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).

I bumped the overhead memory as a way to work around the issue. Not sure if that is the best way but its how I got around it ;)

darin wrote:
Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
     originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
       .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd =>  {
         if (!rdd.isEmpty()) {
           val batchTime = Calendar.getInstance.getTimeInMillis
           val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
           val nameList = rdd.map(_._2).reduce(_ ++ _).toList
           val jedis = RedisUtils.jedis()
           jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
           jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
           jedis.set(joinString("t_ad", batchTime.toString), "OK")
           jedis.close()

           rdd.flatMap(_._3).foreachPartition(logInfoList =>  {
             val producter = new StringProducter
             for (logInfo<- logInfoList) {
               val logInfoArr = logInfo.split("\t", -1)
               val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
               producter.send("cookedLog", kafkaKey, logInfo)
             }
             producter.close()
           })
         }
       })
```

These are jvm heap mat results

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to