Thanks for reporting it. I will take a look.

On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> wrote:

> Hi,
> I've been playing with the expiramental PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this along
> in case anyone else is experiencing such a failure or perhaps someone has
> insightful information if this is actually a bug:  SPARK-13195
> <https://issues.apache.org/jira/browse/SPARK-13195>
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using *state.update*. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>     val sparkContext = new SparkContext(sc)
>
>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>     val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
>     // Create a stream that generates 1000 lines per second
>     val stream = ssc.receiverStream(new DummySource(10))
>
>     // Split the lines into words, and create a paired (key-value) dstream
>     val wordStream = stream.flatMap {
>       _.split(" ")
>     }.map(word => (word, 1))
>
>     // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
>     // this stream will contain the same # of records as the input dstream.
>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>     wordCountStateStream.print()
>
>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
> time
> we query it interactively
>
>     // Don't forget to set checkpoint directory
>     ssc.checkpoint("")
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
> state: State[Long]): Option[(String, Long)] = {
>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>     val output = (key, sum)
>     Some(output)
>   }
> }
> {code}
>
> {code:Title=DummySource}
>
> /**
>   * Created by yuvali on 04/02/2016.
>   */
>
> import org.apache.spark.storage.StorageLevel
> import scala.util.Random
> import org.apache.spark.streaming.receiver._
>
> class DummySource(ratePerSec: Int) extends
> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>
>   def onStart() {
>     // Start the thread that receives data over a connection
>     new Thread("Dummy Source") {
>       override def run() { receive() }
>     }.start()
>   }
>
>   def onStop() {
>     // There is nothing much to do as the thread calling receive()
>     // is designed to stop by itself isStopped() returns false
>   }
>
>   /** Create a socket connection and receive data until receiver is stopped
> */
>   private def receive() {
>     while(!isStopped()) {
>       store("I am a dummy source " + Random.nextInt(10))
>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>     }
>   }
> }
> {code}
>
> The given issue resides in the following
> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
> following code block:
>
> {code}
> dataIterator.foreach { case (key, value) =>
>       wrappedState.wrap(newStateMap.get(key))
>       val returned = mappingFunction(batchTime, key, Some(value),
> wrappedState)
>       if (wrappedState.isRemoved) {
>         newStateMap.remove(key)
>       } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
> /* <--- problem is here */ {
>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>       }
>       mappedData ++= returned
> }
> {code}
>
> In case the stream has a timeout set, but the state wasn't set at all, the
> "else-if" will still follow through because the timeout is defined but
> "wrappedState" is empty and wasn't set.
>
> If it is mandatory to update state for each entry of *mapWithState*, then
> this code should throw a better exception than "NoSuchElementException",
> which doesn't really saw anything to the developer.
>
> I haven't provided a fix myself because I'm not familiar with the spark
> implementation, but it seems to be there needs to either be an extra check
> if the state is set, or as previously stated a better exception message.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to