Awesome. Thanks for the super fast reply.

On Thu, Feb 4, 2016, 21:16 Tathagata Das <>

> Shixiong has already opened the PR -
> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <>
> wrote:
>> Let me know if you do need a pull request for this, I can make that
>> happen (given someone does a vast PR to make sure I'm understanding this
>> problem right).
>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>> wrote:
>>> Thanks for reporting it. I will take a look.
>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <>
>>> wrote:
>>>> Hi,
>>>> I've been playing with the expiramental
>>>> PairDStreamFunctions.mapWithState
>>>> feature and I've seem to have stumbled across a bug, and was wondering
>>>> if
>>>> anyone else has been seeing this behavior.
>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>>> along
>>>> in case anyone else is experiencing such a failure or perhaps someone
>>>> has
>>>> insightful information if this is actually a bug:  SPARK-13195
>>>> <>
>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>> setting a
>>>> timeout for mapWithState but no explicit state handling.
>>>> h1. Steps to reproduce:
>>>> 1. Create a method which conforms to the StateSpec signature, make sure
>>>> to
>>>> not update any state inside it using *state.update*. Simply create a
>>>> "pass
>>>> through" method, may even be empty.
>>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>>> sets
>>>> a timeout using *StateSpec.timeout* method.
>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>> StateSpec.
>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>> throwing
>>>> the following exception:
>>>> {code}
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 0 in
>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not
>>>> set
>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>         at
>>>>         at
>>>> org.apache.spark.executor.Executor$
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$
>>>>         at
>>>> {code}
>>>> h1. Sample code to reproduce the issue:
>>>> {code:Title=MainObject}
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>> /**
>>>>   * Created by yuvali on 04/02/2016.
>>>>   */
>>>> object Program {
>>>>   def main(args: Array[String]): Unit = {
>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>     val sparkContext = new SparkContext(sc)
>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>> _).timeout(Seconds(60))
>>>>     // Create a stream that generates 1000 lines per second
>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>     // Split the lines into words, and create a paired (key-value)
>>>> dstream
>>>>     val wordStream = stream.flatMap {
>>>>       _.split(" ")
>>>>     }.map(word => (word, 1))
>>>>     // This represents the emitted stream from the trackStateFunc.
>>>> Since we
>>>> emit every input record with the updated value,
>>>>     // this stream will contain the same # of records as the input
>>>> dstream.
>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>     wordCountStateStream.print()
>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>>>> time
>>>> we query it interactively
>>>>     // Don't forget to set checkpoint directory
>>>>     ssc.checkpoint("")
>>>>     ssc.start()
>>>>     ssc.awaitTermination()
>>>>   }
>>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>>> state: State[Long]): Option[(String, Long)] = {
>>>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>>>     val output = (key, sum)
>>>>     Some(output)
>>>>   }
>>>> }
>>>> {code}
>>>> {code:Title=DummySource}
>>>> /**
>>>>   * Created by yuvali on 04/02/2016.
>>>>   */
>>>> import
>>>> import scala.util.Random
>>>> import org.apache.spark.streaming.receiver._
>>>> class DummySource(ratePerSec: Int) extends
>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>   def onStart() {
>>>>     // Start the thread that receives data over a connection
>>>>     new Thread("Dummy Source") {
>>>>       override def run() { receive() }
>>>>     }.start()
>>>>   }
>>>>   def onStop() {
>>>>     // There is nothing much to do as the thread calling receive()
>>>>     // is designed to stop by itself isStopped() returns false
>>>>   }
>>>>   /** Create a socket connection and receive data until receiver is
>>>> stopped
>>>> */
>>>>   private def receive() {
>>>>     while(!isStopped()) {
>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>     }
>>>>   }
>>>> }
>>>> {code}
>>>> The given issue resides in the following
>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>>> following code block:
>>>> {code}
>>>> dataIterator.foreach { case (key, value) =>
>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>> wrappedState)
>>>>       if (wrappedState.isRemoved) {
>>>>         newStateMap.remove(key)
>>>>       } else if (wrappedState.isUpdated ||
>>>> timeoutThresholdTime.isDefined)
>>>> /* <--- problem is here */ {
>>>>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>>>>       }
>>>>       mappedData ++= returned
>>>> }
>>>> {code}
>>>> In case the stream has a timeout set, but the state wasn't set at all,
>>>> the
>>>> "else-if" will still follow through because the timeout is defined but
>>>> "wrappedState" is empty and wasn't set.
>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>> then
>>>> this code should throw a better exception than "NoSuchElementException",
>>>> which doesn't really saw anything to the developer.
>>>> I haven't provided a fix myself because I'm not familiar with the spark
>>>> implementation, but it seems to be there needs to either be an extra
>>>> check
>>>> if the state is set, or as previously stated a better exception message.
>>>> --
>>>> View this message in context:
>>>> Sent from the Apache Spark User List mailing list archive at
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail:
>>>> For additional commands, e-mail:
>> --
>> Best Regards,
>> Yuval Itzchakov.

Reply via email to