Hi Iain,
Thanks for your reply. Actually i changed my trackStateFunc, it's working now. For reference my working code with mapWithState: def trackStateFunc(batchTime: Time, key: String, value: Option[Array[Long]], state: State[Array[Long]]) : Option[(String, Array[Long])] = { // Check if state exists if (state.exists) { val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum) state.update(newState) // Set the new state Some((key, newState)) } else { val initialState = value.get state.update(initialState) // Set the initial state Some((key, initialState)) } } // StateSpec[KeyType, ValueType, StateType, MappedType] val stateSpec: StateSpec[String, Array[Long], Array[Long], (String, Array[Long])] = StateSpec.function(trackStateFunc _) val state: MapWithStateDStream[String, Array[Long], Array[Long], (String, Array[Long])] = parsedStream.mapWithState(stateSpec) Thanks & Regards, Vinti On Mon, Mar 14, 2016 at 7:06 AM, Iain Cundy <iain.cu...@amdocs.com> wrote: > Hi Vinti > > > > I don’t program in scala, but I think you’ve changed the meaning of the > current variable – look again at what it state and what is new data. > > > > Assuming it works like the Java API, to use this function to maintain > State you must call State.update, while you can return anything, not just > the state. > > > > Cheers > > Iain > > > > *From:* Vinti Maheshwari [mailto:vinti.u...@gmail.com] > *Sent:* 12 March 2016 22:10 > *To:* user > *Subject:* [MARKETING] Spark Streaming stateful transformation > mapWithState function getting error scala.MatchError: [Ljava.lang.Object] > > > > Hi All, > > I wanted to replace my updateStateByKey function with mapWithState > function (Spark 1.6) to improve performance of my program. > > I was following these two documents: > https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html > > > https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html > > but i am getting error *scala.MatchError: [Ljava.lang.Object]* > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 > (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: > [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;) > > at > HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84) > > at > HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84) > > at scala.Option.flatMap(Option.scala:170) > > at > HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84) > > Reference code: > > def trackStateFunc(key:String, value:Option[Array[Long]], > current:State[Array[Long]]) = { > > > > //either we can use this > > // current.update(value) > > > > value.map(_ :+ current).orElse(Some(current)).flatMap{ > > case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + > _).toArray).toOption > > case None => ??? > > } > > } > > > > val statespec:StateSpec[String, Array[Long], Array[Long], > Option[Array[Long]]] = StateSpec.function(trackStateFunc _) > > > > val state: MapWithStateDStream[String, Array[Long], Array[Long], > Option[Array[Long]]] = parsedStream.mapWithState(statespec) > > My previous working code which was using updateStateByKey function: > > val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( > > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > > prev.map(_ +: current).orElse(Some(current)) > > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) > > }) > > Anyone has idea what can be the issue? > > Thanks & Regards, > > Vinti > This message and the information contained herein is proprietary and > confidential and subject to the Amdocs policy statement, you may review at > http://www.amdocs.com/email_disclaimer.asp >