Hi All,

I wanted to replace my updateStateByKey function with mapWithState function
(Spark 1.6) to improve performance of my program.

I was following these two documents:
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html

but i am getting error *scala.MatchError: [Ljava.lang.Object]*

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task
0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net):
scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class
[Ljava.lang.Object;)
at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at scala.Option.flatMap(Option.scala:170)
at 
HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]],
current:State[Array[Long]]) = {

        //either we can use this
        // current.update(value)

        value.map(_ :+ current).orElse(Some(current)).flatMap{
          case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ +
_).toArray).toOption
          case None => ???
        }
      }

      val statespec:StateSpec[String, Array[Long], Array[Long],
Option[Array[Long]]] = StateSpec.function(trackStateFunc _)

      val state: MapWithStateDStream[String, Array[Long], Array[Long],
Option[Array[Long]]] = parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
         prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })

Anyone has idea what can be the issue?

Thanks & Regards,
Vinti

Reply via email to