Hi Iain,

Thanks for your reply. Actually i changed my trackStateFunc, it's working
now.

For reference my working code with mapWithState:


def trackStateFunc(batchTime: Time, key: String, value:
Option[Array[Long]], state: State[Array[Long]])
  : Option[(String, Array[Long])] = {
  // Check if state exists
  if (state.exists) {
    val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum)
    state.update(newState)    // Set the new state
    Some((key, newState))
  } else {
    val initialState = value.get
    state.update(initialState) // Set the initial state
    Some((key, initialState))
  }
}

// StateSpec[KeyType, ValueType, StateType, MappedType]
val stateSpec: StateSpec[String, Array[Long], Array[Long], (String,
Array[Long])] = StateSpec.function(trackStateFunc _)

val state: MapWithStateDStream[String, Array[Long], Array[Long],
(String, Array[Long])] = parsedStream.mapWithState(stateSpec)


Thanks & Regards,

Vinti


On Mon, Mar 14, 2016 at 7:06 AM, Iain Cundy <iain.cu...@amdocs.com> wrote:

> Hi Vinti
>
>
>
> I don’t program in scala, but I think you’ve changed the meaning of the
> current variable – look again at what it state and what is new data.
>
>
>
> Assuming it works like the Java API, to use this function to maintain
> State you must call State.update, while you can return anything, not just
> the state.
>
>
>
> Cheers
>
> Iain
>
>
>
> *From:* Vinti Maheshwari [mailto:vinti.u...@gmail.com]
> *Sent:* 12 March 2016 22:10
> *To:* user
> *Subject:* [MARKETING] Spark Streaming stateful transformation
> mapWithState function getting error scala.MatchError: [Ljava.lang.Object]
>
>
>
> Hi All,
>
> I wanted to replace my updateStateByKey function with mapWithState
> function (Spark 1.6) to improve performance of my program.
>
> I was following these two documents:
> https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html
>
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html
>
> but i am getting error *scala.MatchError: [Ljava.lang.Object]*
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 
> (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: 
> [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)
>
> at 
> HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
>
> at 
> HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
>
> at scala.Option.flatMap(Option.scala:170)
>
> at 
> HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)
>
> Reference code:
>
> def trackStateFunc(key:String, value:Option[Array[Long]], 
> current:State[Array[Long]]) = {
>
>
>
>         //either we can use this
>
>         // current.update(value)
>
>
>
>         value.map(_ :+ current).orElse(Some(current)).flatMap{
>
>           case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + 
> _).toArray).toOption
>
>           case None => ???
>
>         }
>
>       }
>
>
>
>       val statespec:StateSpec[String, Array[Long], Array[Long], 
> Option[Array[Long]]] = StateSpec.function(trackStateFunc _)
>
>
>
>       val state: MapWithStateDStream[String, Array[Long], Array[Long], 
> Option[Array[Long]]] = parsedStream.mapWithState(statespec)
>
> My previous working code which was using updateStateByKey function:
>
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>
>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>
>          prev.map(_ +: current).orElse(Some(current))
>
>           .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>
>       })
>
> Anyone has idea what can be the issue?
>
> Thanks & Regards,
>
> Vinti
> This message and the information contained herein is proprietary and
> confidential and subject to the Amdocs policy statement, you may review at
> http://www.amdocs.com/email_disclaimer.asp
>

Reply via email to