HI TD, Some observations:
1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* * at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* * at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* * at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* * at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* * at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* * at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* * at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:606)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* * at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:606)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* * at scala.collection.immutable.$colon$colon.readObject(List.scala:366)* * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* 3. If I submit the application using spark-submit tool with *cluster as deploy mode* it *crashes after some time with Out Of Memory error *with single master and worker (driver, master and worker are running in same machine) * 15/04/23 05:47:49 Executor: Exception in task 0.0 in stage 858.0 (TID 1464)* *java.lang.OutOfMemoryError: Java heap space* * at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* * at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* * at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* * at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* * at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* * at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* * at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:606)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* * at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)* * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)* * at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)* * at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* * at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* * at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* * at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:606)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* Thanks, Sourav On Wed, Apr 22, 2015 at 11:40 PM, Tathagata Das <t...@databricks.com> wrote: > It could very well be that your executor memory is not enough to store the > state RDDs AND operate on the data. 1G per executor is quite low. > Definitely give more memory. And have you tried increasing the number of > partitions (specify number of partitions in updateStateByKey) ? > > On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra < > sourav.chan...@livestream.com> wrote: > >> Anyone? >> >> On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra < >> sourav.chan...@livestream.com> wrote: >> >> > Hi Olivier, >> > >> > *the update function is as below*: >> > >> > *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, >> > Long)]) => {* >> > * val previousCount = state.getOrElse((0L, 0L))._2* >> > * var startValue: IConcurrentUsers = ConcurrentViewers(0)* >> > * var currentCount = 0L* >> > * val lastIndexOfConcurrentUsers =* >> > * values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* >> > * val subList = values.slice(0, lastIndexOfConcurrentUsers)* >> > * val currentCountFromSubList = subList.foldLeft(startValue)(_ op >> > _).count + previousCount* >> > * val lastConcurrentViewersCount = >> > values(lastIndexOfConcurrentUsers).count* >> > >> > * if (math.abs(lastConcurrentViewersCount - >> currentCountFromSubList) >> > >= 1) {* >> > * logger.error(* >> > * s"Count using state updation $currentCountFromSubList, " +* >> > * s"ConcurrentUsers count $lastConcurrentViewersCount" +* >> > * s" resetting to $lastConcurrentViewersCount"* >> > * )* >> > * currentCount = lastConcurrentViewersCount* >> > * }* >> > * val remainingValuesList = values.diff(subList)* >> > * startValue = ConcurrentViewers(currentCount)* >> > * currentCount = remainingValuesList.foldLeft(startValue)(_ op >> > _).count* >> > >> > * if (currentCount < 0) {* >> > >> > * logger.error(* >> > * s"ERROR: Got new count $currentCount < 0, value:$values, >> > state:$state, resetting to 0"* >> > * )* >> > * currentCount = 0* >> > * }* >> > * // to stop pushing subsequent 0 after receiving first 0* >> > * if (currentCount == 0 && previousCount == 0) None* >> > * else Some(previousCount, currentCount)* >> > * }* >> > >> > *trait IConcurrentUsers {* >> > * val count: Long* >> > * def op(a: IConcurrentUsers): IConcurrentUsers = >> > IConcurrentUsers.op(this, a)* >> > *}* >> > >> > *object IConcurrentUsers {* >> > * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = >> > (a, b) match {* >> > * case (_, _: ConcurrentViewers) => * >> > * ConcurrentViewers(b.count)* >> > * case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => * >> > * ConcurrentViewers(a.count + b.count)* >> > * case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => * >> > * ConcurrentViewers(a.count - b.count)* >> > * }* >> > *}* >> > >> > *case class IncrementConcurrentViewers(count: Long) extends >> > IConcurrentUsers* >> > *case class DecrementConcurrentViewers(count: Long) extends >> > IConcurrentUsers* >> > *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* >> > >> > >> > *also the error stack trace copied from executor logs is:* >> > >> > *java.lang.OutOfMemoryError: Java heap space* >> > * at >> > >> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* >> > * at >> > >> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* >> > * at >> > org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* >> > * at >> > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* >> > * at >> > >> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* >> > * at >> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* >> > * at >> > >> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* >> > * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* >> > * at >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* >> > * at >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >> > * at java.lang.reflect.Method.invoke(Method.java:601)* >> > * at >> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* >> > * at >> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* >> > * at >> > >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >> > * at >> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >> > * at >> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* >> > * at >> > >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* >> > * at >> > >> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* >> > * at >> > >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* >> > * at >> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* >> > * at >> > >> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* >> > * at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)* >> > * at >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >> > * at java.lang.reflect.Method.invoke(Method.java:601)* >> > * at >> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* >> > * at >> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* >> > * at >> > >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >> > * at >> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >> > * at >> > >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)* >> > * at >> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)* >> > * at >> > >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >> > * at >> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >> > *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught >> > exception in thread Thread[Executor task launch worker-1,5,main]* >> > >> > >> > >> > On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <ssab...@gmail.com> >> > wrote: >> > >> >> Hi Sourav, >> >> Can you post your updateFunc as well please ? >> >> >> >> Regards, >> >> >> >> Olivier. >> >> >> >> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra < >> >> sourav.chan...@livestream.com> a écrit : >> >> >> >>> Hi, >> >>> >> >>> We are building a spark streaming application which reads from kafka, >> >>> does updateStateBykey based on the received message type and finally >> stores >> >>> into redis. >> >>> >> >>> After running for few seconds the executor process get killed by >> >>> throwing OutOfMemory error. >> >>> >> >>> The code snippet is below: >> >>> >> >>> >> >>> *NoOfReceiverInstances = 1* >> >>> >> >>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(* >> >>> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, >> TopicsMap)* >> >>> *)* >> >>> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, >> >>> Long)]) => {...}* >> >>> >> >>> >> >>> >> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* >> >>> >> >>> >> >>> >> >>> *object RedisHelper {* >> >>> * private val client = scredis.Redis(* >> >>> * >> >>> >> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* >> >>> * )* >> >>> >> >>> * def update(**itr: Iterator[(String, (Long, Long))]) {* >> >>> * // redis save operation* >> >>> * }* >> >>> >> >>> *}* >> >>> >> >>> >> >>> *Below is the spark configuration:* >> >>> >> >>> >> >>> * spark.app.name <http://spark.app.name> = "XXXXXXX"* >> >>> * spark.jars = "xxxx.jar"* >> >>> * spark.home = "/spark-1.1.1-bin-hadoop2.4"* >> >>> * spark.executor.memory = 1g* >> >>> * spark.streaming.concurrentJobs = 1000* >> >>> * spark.logConf = true* >> >>> * spark.cleaner.ttl = 3600 //in milliseconds* >> >>> * spark.default.parallelism = 12* >> >>> * spark.executor.extraJavaOptions = "-Xloggc:gc.log >> >>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof >> >>> -XX:+HeapDumpOnOutOfMemoryError"* >> >>> * spark.executor.logs.rolling.strategy = "size"* >> >>> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* >> >>> * spark.executor.logs.rolling.maxRetainedFiles = 10* >> >>> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"* >> >>> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"* >> >>> >> >>> >> >>> other configurations are below >> >>> >> >>> *streaming {* >> >>> * // All streaming context related configs should come here* >> >>> * batch-duration = "1 second"* >> >>> * checkpoint-directory = "/tmp"* >> >>> * checkpoint-duration = "10 seconds"* >> >>> * slide-duration = "1 second"* >> >>> * window-duration = "1 second"* >> >>> * partitions-for-shuffle-task = 32* >> >>> * }* >> >>> * kafka {* >> >>> * no-of-receivers = 1* >> >>> * zookeeper-quorum = "xxxx:2181"* >> >>> * consumer-group = "xxxxx"* >> >>> * topic = "xxxxx:2"* >> >>> * }* >> >> >>> >> >>> We tried different combinations like >> >>> - with spark 1.1.0 and 1.1.1. >> >>> - by increasing executor memory >> >>> - by changing the serialization strategy (switching between kryo and >> >>> normal java) >> >>> - by changing broadcast strategy (switching between http and torrent >> >>> broadcast) >> >>> >> >>> >> >>> Can anyone give any insight what we are missing here? How can we fix >> >>> this? >> >>> >> >>> Due to akka version mismatch with some other libraries we cannot >> upgrade >> >>> the spark version. >> >>> >> >>> Thanks, >> >>> -- >> >>> >> >>> Sourav Chandra >> >>> >> >>> Senior Software Engineer >> >>> >> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >> >>> >> >>> sourav.chan...@livestream.com >> >>> >> >>> o: +91 80 4121 8723 >> >>> >> >>> m: +91 988 699 3746 >> >>> >> >>> skype: sourav.chandra >> >>> >> >>> Livestream >> >>> >> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, >> 3rd >> >>> Block, Koramangala Industrial Area, >> >>> >> >>> Bangalore 560034 >> >>> >> >>> www.livestream.com >> >>> >> >> >> > >> > >> > -- >> > >> > Sourav Chandra >> > >> > Senior Software Engineer >> > >> > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >> > >> > sourav.chan...@livestream.com >> > >> > o: +91 80 4121 8723 >> > >> > m: +91 988 699 3746 >> > >> > skype: sourav.chandra >> > >> > Livestream >> > >> > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >> > Block, Koramangala Industrial Area, >> > >> > Bangalore 560034 >> > >> > www.livestream.com >> > >> >> >> >> -- >> >> Sourav Chandra >> >> Senior Software Engineer >> >> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >> >> sourav.chan...@livestream.com >> >> o: +91 80 4121 8723 >> >> m: +91 988 699 3746 >> >> skype: sourav.chandra >> >> Livestream >> >> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >> Block, Koramangala Industrial Area, >> >> Bangalore 560034 >> >> www.livestream.com >> > > -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com