Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
HI TD, Some observations: 1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* *at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* *at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* *at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* *at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* *at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:366)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* 3. If I submit the
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
*bump* On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra sourav.chan...@livestream.com wrote: HI TD, Some observations: 1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* *at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* *at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* *at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* *at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* *at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
It could very well be that your executor memory is not enough to store the state RDDs AND operate on the data. 1G per executor is quite low. Definitely give more memory. And have you tried increasing the number of partitions (specify number of partitions in updateStateByKey) ? On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra sourav.chan...@livestream.com wrote: Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi Sourav, Can you post your updateFunc as well please ? Regards, Olivier. Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com a écrit : Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com