Hi,

In my use case I need to maintain history data for a key. For this I am
using updateStateByKey in which state is maintained as mutable scala
collection(ArrayBuffer). Each element in ArrayBuffer is an incoming record.
Spark version is 1.6

As number of elements(records) increases in the ArrayBuffer for a key I am
getting StackOverflow error.
16/03/28 07:31:55 ERROR scheduler.JobScheduler: Error running job streaming
job 1459150304000 ms.2
java.lang.StackOverflowError
        at
scala.collection.immutable.StringOps.stripSuffix(StringOps.scala:31)
        at org.apache.spark.Logging$class.logName(Logging.scala:44)
        at org.apache.spark.rdd.RDD.logName(RDD.scala:74)
        at org.apache.spark.Logging$class.log(Logging.scala:51)
        at org.apache.spark.rdd.RDD.log(RDD.scala:74)
        at org.apache.spark.Logging$class.logDebug(Logging.scala:62)
        at org.apache.spark.rdd.RDD.logDebug(RDD.scala:74)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:104)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:99)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at
org.apache.spark.rdd.CoGroupedRDD.getDependencies(CoGroupedRDD.scala:99)
        at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
        at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:117)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1.apply$mcVI$sp(CoGroupedRDD.scala:115)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at
org.apache.spark.rdd.CoGroupedRDD.getPartitions(CoGroupedRDD.scala:113)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:121)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

Following is the code snippet
 /def updateState(rows: Seq[ArrayBuffer[Row]], state:
Option[ArrayBuffer[Row]]) = {

      val prevState = state.getOrElse[ArrayBuffer[Row]](ArrayBuffer[Row]())

      val newState = ArrayBuffer.empty[Row]
      newState ++= prevState
      for (r <- rows) {
        newState += r(0)
      }
      Some(newState)
    }

    val pairedFaultStream = getPairedStream(faultStream, sqlContext)
    val workingStream =
pairedFaultStream.updateStateByKey[ArrayBuffer[Row]](updateState
_).map(_._2)/

I have tried following approaches
1. truncating lineage by caching and checkpointing rdd of *workingStream*.
2. using kryo serialization

Any suggestion will be appreciated.

- Thanks
Vikash 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-in-updateStateByKey-tp26613.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to