Let me know if you do need a pull request for this, I can make that happen
(given someone does a vast PR to make sure I'm understanding this problem
right).

On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> wrote:
>
>> Hi,
>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone has
>> insightful information if this is actually a bug:  SPARK-13195
>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>
>> Using the new spark mapWithState API, I've encountered a bug when setting
>> a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make sure to
>> not update any state inside it using *state.update*. Simply create a "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which explicitly
>> sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> {code}
>>
>> h1. Sample code to reproduce the issue:
>>
>> {code:Title=MainObject}
>> import org.apache.spark.streaming._
>> import org.apache.spark.{SparkConf, SparkContext}
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>> object Program {
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>     val sparkContext = new SparkContext(sc)
>>
>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>     val stateSpec = StateSpec.function(trackStateFunc
>> _).timeout(Seconds(60))
>>
>>     // Create a stream that generates 1000 lines per second
>>     val stream = ssc.receiverStream(new DummySource(10))
>>
>>     // Split the lines into words, and create a paired (key-value) dstream
>>     val wordStream = stream.flatMap {
>>       _.split(" ")
>>     }.map(word => (word, 1))
>>
>>     // This represents the emitted stream from the trackStateFunc. Since
>> we
>> emit every input record with the updated value,
>>     // this stream will contain the same # of records as the input
>> dstream.
>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>     wordCountStateStream.print()
>>
>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>> time
>> we query it interactively
>>
>>     // Don't forget to set checkpoint directory
>>     ssc.checkpoint("")
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>> state: State[Long]): Option[(String, Long)] = {
>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>     val output = (key, sum)
>>     Some(output)
>>   }
>> }
>> {code}
>>
>> {code:Title=DummySource}
>>
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>>
>> import org.apache.spark.storage.StorageLevel
>> import scala.util.Random
>> import org.apache.spark.streaming.receiver._
>>
>> class DummySource(ratePerSec: Int) extends
>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>
>>   def onStart() {
>>     // Start the thread that receives data over a connection
>>     new Thread("Dummy Source") {
>>       override def run() { receive() }
>>     }.start()
>>   }
>>
>>   def onStop() {
>>     // There is nothing much to do as the thread calling receive()
>>     // is designed to stop by itself isStopped() returns false
>>   }
>>
>>   /** Create a socket connection and receive data until receiver is
>> stopped
>> */
>>   private def receive() {
>>     while(!isStopped()) {
>>       store("I am a dummy source " + Random.nextInt(10))
>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>     }
>>   }
>> }
>> {code}
>>
>> The given issue resides in the following
>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>> following code block:
>>
>> {code}
>> dataIterator.foreach { case (key, value) =>
>>       wrappedState.wrap(newStateMap.get(key))
>>       val returned = mappingFunction(batchTime, key, Some(value),
>> wrappedState)
>>       if (wrappedState.isRemoved) {
>>         newStateMap.remove(key)
>>       } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
>> /* <--- problem is here */ {
>>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>>       }
>>       mappedData ++= returned
>> }
>> {code}
>>
>> In case the stream has a timeout set, but the state wasn't set at all, the
>> "else-if" will still follow through because the timeout is defined but
>> "wrappedState" is empty and wasn't set.
>>
>> If it is mandatory to update state for each entry of *mapWithState*, then
>> this code should throw a better exception than "NoSuchElementException",
>> which doesn't really saw anything to the developer.
>>
>> I haven't provided a fix myself because I'm not familiar with the spark
>> implementation, but it seems to be there needs to either be an extra check
>> if the state is set, or as previously stated a better exception message.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Best Regards,
Yuval Itzchakov.

Reply via email to