Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
HI TD,

Some observations:

1. If I submit the application using spark-submit tool with *client as
deploy mode* it works fine with single master and worker (driver, master
and worker are running in same machine)
2. If I submit the application using spark-submit tool with client as
deploy mode it *crashes after some time with  StackOverflowError* *single
master and 2 workers* (driver, master and 1 worker is running in same
machine, other
worker is in different machine)
 *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
(TID 5412)*
*java.lang.StackOverflowError*
*at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
*at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
*at
java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
*at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
*at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
*at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:366)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*


3. If I submit the 

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
*bump*

On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 HI TD,

 Some observations:

 1. If I submit the application using spark-submit tool with *client as
 deploy mode* it works fine with single master and worker (driver, master
 and worker are running in same machine)
 2. If I submit the application using spark-submit tool with client as
 deploy mode it *crashes after some time with  StackOverflowError* *single
 master and 2 workers* (driver, master and 1 worker is running in same
 machine, other
 worker is in different machine)
  *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
 (TID 5412)*
 *java.lang.StackOverflowError*
 *at
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
 *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
 *at
 java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
 *at
 java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
 *at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
 *at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Tathagata Das
It could very well be that your executor memory is not enough to store the
state RDDs AND operate on the data. 1G per executor is quite low.
Definitely give more memory. And have you tried increasing the number of
partitions (specify number of partitions in updateStateByKey) ?

On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Anyone?

 On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

  Hi Olivier,
 
  *the update function is as below*:
 
  *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
  Long)]) = {*
  *  val previousCount = state.getOrElse((0L, 0L))._2*
  *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
  *  var currentCount = 0L*
  *  val lastIndexOfConcurrentUsers =*
  *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
  *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
  *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
  _).count + previousCount*
  *  val lastConcurrentViewersCount =
  values(lastIndexOfConcurrentUsers).count*
 
  *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
  = 1) {*
  *logger.error(*
  *  sCount using state updation $currentCountFromSubList,  +*
  *sConcurrentUsers count $lastConcurrentViewersCount +*
  *s resetting to $lastConcurrentViewersCount*
  *)*
  *currentCount = lastConcurrentViewersCount*
  *  }*
  *  val remainingValuesList = values.diff(subList)*
  *  startValue = ConcurrentViewers(currentCount)*
  *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
  _).count*
 
  *  if (currentCount  0) {*
 
  *logger.error(*
  *  sERROR: Got new count $currentCount  0, value:$values,
  state:$state, resetting to 0*
  *)*
  *currentCount = 0*
  *  }*
  *  // to stop pushing subsequent 0 after receiving first 0*
  *  if (currentCount == 0  previousCount == 0) None*
  *  else Some(previousCount, currentCount)*
  *}*
 
  *trait IConcurrentUsers {*
  *  val count: Long*
  *  def op(a: IConcurrentUsers): IConcurrentUsers =
  IConcurrentUsers.op(this, a)*
  *}*
 
  *object IConcurrentUsers {*
  *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
  (a, b) match {*
  *case (_, _: ConcurrentViewers) = *
  *  ConcurrentViewers(b.count)*
  *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count + b.count)*
  *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count - b.count)*
  *  }*
  *}*
 
  *case class IncrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class DecrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
 
 
  *also the error stack trace copied from executor logs is:*
 
  *java.lang.OutOfMemoryError: Java heap space*
  *at
 
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
  *at
  org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
  *at
  org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
  *at
  org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
  *at
 
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
  *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
  *at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
  *at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
  *at java.lang.reflect.Method.invoke(Method.java:601)*
  *at
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
  *at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
  *at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
  *at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
  *at
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
  *at
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Sourav Chandra
Anyone?

On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Hi Olivier,

 *the update function is as below*:

 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {*
 *  val previousCount = state.getOrElse((0L, 0L))._2*
 *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
 *  var currentCount = 0L*
 *  val lastIndexOfConcurrentUsers =*
 *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
 *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
 *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
 _).count + previousCount*
 *  val lastConcurrentViewersCount =
 values(lastIndexOfConcurrentUsers).count*

 *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
 = 1) {*
 *logger.error(*
 *  sCount using state updation $currentCountFromSubList,  +*
 *sConcurrentUsers count $lastConcurrentViewersCount +*
 *s resetting to $lastConcurrentViewersCount*
 *)*
 *currentCount = lastConcurrentViewersCount*
 *  }*
 *  val remainingValuesList = values.diff(subList)*
 *  startValue = ConcurrentViewers(currentCount)*
 *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
 _).count*

 *  if (currentCount  0) {*

 *logger.error(*
 *  sERROR: Got new count $currentCount  0, value:$values,
 state:$state, resetting to 0*
 *)*
 *currentCount = 0*
 *  }*
 *  // to stop pushing subsequent 0 after receiving first 0*
 *  if (currentCount == 0  previousCount == 0) None*
 *  else Some(previousCount, currentCount)*
 *}*

 *trait IConcurrentUsers {*
 *  val count: Long*
 *  def op(a: IConcurrentUsers): IConcurrentUsers =
 IConcurrentUsers.op(this, a)*
 *}*

 *object IConcurrentUsers {*
 *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
 (a, b) match {*
 *case (_, _: ConcurrentViewers) = *
 *  ConcurrentViewers(b.count)*
 *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count + b.count)*
 *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count - b.count)*
 *  }*
 *}*

 *case class IncrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class DecrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*


 *also the error stack trace copied from executor logs is:*

 *java.lang.OutOfMemoryError: Java heap space*
 *at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
 *at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
 *at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
 *at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
 *at
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
 *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
 *at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
 *at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
 *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
 *at
 

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-21 Thread Olivier Girardot
Hi Sourav,
Can you post your updateFunc as well please ?

Regards,

Olivier.

Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com
a écrit :

 Hi,

 We are building a spark streaming application which reads from kafka, does
 updateStateBykey based on the received message type and finally stores into
 redis.

 After running for few seconds the executor process get killed by throwing
 OutOfMemory error.

 The code snippet is below:


 *NoOfReceiverInstances = 1*

 *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
 *  _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
 *)*
 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {...}*


 *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



 *object RedisHelper {*
 *  private val client = scredis.Redis(*
 *
 ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
 *  )*

 *  def update(**itr: Iterator[(String, (Long, Long))]) {*
 *// redis save operation*
 *  }*

 *}*


 *Below is the spark configuration:*


 *spark.app.name http://spark.app.name = XXX*
 *spark.jars = .jar*
 *spark.home = /spark-1.1.1-bin-hadoop2.4*
 *spark.executor.memory = 1g*
 *spark.streaming.concurrentJobs = 1000*
 *spark.logConf = true*
 *spark.cleaner.ttl = 3600 //in milliseconds*
 *spark.default.parallelism = 12*
 *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails
 -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
 -XX:+HeapDumpOnOutOfMemoryError*
 *spark.executor.logs.rolling.strategy = size*
 *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
 *spark.executor.logs.rolling.maxRetainedFiles = 10*
 *spark.serializer = org.apache.spark.serializer.KryoSerializer*
 *spark.kryo.registrator = xxx.NoOpKryoRegistrator*


 other configurations are below

 *streaming {*
 *// All streaming context related configs should come here*
 *batch-duration = 1 second*
 *checkpoint-directory = /tmp*
 *checkpoint-duration = 10 seconds*
 *slide-duration = 1 second*
 *window-duration = 1 second*
 *partitions-for-shuffle-task = 32*
 *  }*
 *  kafka {*
 *no-of-receivers = 1*
 *zookeeper-quorum = :2181*
 *consumer-group = x*
 *topic = x:2*
 *  }*

 We tried different combinations like
  - with spark 1.1.0 and 1.1.1.
  - by increasing executor memory
  - by changing the serialization strategy (switching between kryo and
 normal java)
  - by changing broadcast strategy (switching between http and torrent
 broadcast)


 Can anyone give any insight what we are missing here? How can we fix this?

 Due to akka version mismatch with some other libraries we cannot upgrade
 the spark version.

 Thanks,
 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com