Awesome. Thanks for the super fast reply. On Thu, Feb 4, 2016, 21:16 Tathagata Das <tathagata.das1...@gmail.com> wrote:
> Shixiong has already opened the PR - > https://github.com/apache/spark/pull/11081 > > On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com> > wrote: > >> Let me know if you do need a pull request for this, I can make that >> happen (given someone does a vast PR to make sure I'm understanding this >> problem right). >> >> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Thanks for reporting it. I will take a look. >>> >>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> I've been playing with the expiramental >>>> PairDStreamFunctions.mapWithState >>>> feature and I've seem to have stumbled across a bug, and was wondering >>>> if >>>> anyone else has been seeing this behavior. >>>> >>>> I've opened up an issue in the Spark JIRA, I simply want to pass this >>>> along >>>> in case anyone else is experiencing such a failure or perhaps someone >>>> has >>>> insightful information if this is actually a bug: SPARK-13195 >>>> <https://issues.apache.org/jira/browse/SPARK-13195> >>>> >>>> Using the new spark mapWithState API, I've encountered a bug when >>>> setting a >>>> timeout for mapWithState but no explicit state handling. >>>> >>>> h1. Steps to reproduce: >>>> >>>> 1. Create a method which conforms to the StateSpec signature, make sure >>>> to >>>> not update any state inside it using *state.update*. Simply create a >>>> "pass >>>> through" method, may even be empty. >>>> 2. Create a StateSpec object with method from step 1, which explicitly >>>> sets >>>> a timeout using *StateSpec.timeout* method. >>>> 3. Create a DStream pipeline that uses mapWithState with the given >>>> StateSpec. >>>> 4. Run code using spark-submit. You'll see that the method ends up >>>> throwing >>>> the following exception: >>>> >>>> {code} >>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>> 0 in >>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage >>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not >>>> set >>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150) >>>> at >>>> >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61) >>>> at >>>> >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> at >>>> >>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >>>> at >>>> >>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>>> at >>>> >>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>> at >>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>> at >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> {code} >>>> >>>> h1. Sample code to reproduce the issue: >>>> >>>> {code:Title=MainObject} >>>> import org.apache.spark.streaming._ >>>> import org.apache.spark.{SparkConf, SparkContext} >>>> /** >>>> * Created by yuvali on 04/02/2016. >>>> */ >>>> object Program { >>>> >>>> def main(args: Array[String]): Unit = { >>>> >>>> val sc = new SparkConf().setAppName("mapWithState bug reproduce") >>>> val sparkContext = new SparkContext(sc) >>>> >>>> val ssc = new StreamingContext(sparkContext, Seconds(4)) >>>> val stateSpec = StateSpec.function(trackStateFunc >>>> _).timeout(Seconds(60)) >>>> >>>> // Create a stream that generates 1000 lines per second >>>> val stream = ssc.receiverStream(new DummySource(10)) >>>> >>>> // Split the lines into words, and create a paired (key-value) >>>> dstream >>>> val wordStream = stream.flatMap { >>>> _.split(" ") >>>> }.map(word => (word, 1)) >>>> >>>> // This represents the emitted stream from the trackStateFunc. >>>> Since we >>>> emit every input record with the updated value, >>>> // this stream will contain the same # of records as the input >>>> dstream. >>>> val wordCountStateStream = wordStream.mapWithState(stateSpec) >>>> wordCountStateStream.print() >>>> >>>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the >>>> time >>>> we query it interactively >>>> >>>> // Don't forget to set checkpoint directory >>>> ssc.checkpoint("") >>>> ssc.start() >>>> ssc.awaitTermination() >>>> } >>>> >>>> def trackStateFunc(batchTime: Time, key: String, value: Option[Int], >>>> state: State[Long]): Option[(String, Long)] = { >>>> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) >>>> val output = (key, sum) >>>> Some(output) >>>> } >>>> } >>>> {code} >>>> >>>> {code:Title=DummySource} >>>> >>>> /** >>>> * Created by yuvali on 04/02/2016. >>>> */ >>>> >>>> import org.apache.spark.storage.StorageLevel >>>> import scala.util.Random >>>> import org.apache.spark.streaming.receiver._ >>>> >>>> class DummySource(ratePerSec: Int) extends >>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { >>>> >>>> def onStart() { >>>> // Start the thread that receives data over a connection >>>> new Thread("Dummy Source") { >>>> override def run() { receive() } >>>> }.start() >>>> } >>>> >>>> def onStop() { >>>> // There is nothing much to do as the thread calling receive() >>>> // is designed to stop by itself isStopped() returns false >>>> } >>>> >>>> /** Create a socket connection and receive data until receiver is >>>> stopped >>>> */ >>>> private def receive() { >>>> while(!isStopped()) { >>>> store("I am a dummy source " + Random.nextInt(10)) >>>> Thread.sleep((1000.toDouble / ratePerSec).toInt) >>>> } >>>> } >>>> } >>>> {code} >>>> >>>> The given issue resides in the following >>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the >>>> following code block: >>>> >>>> {code} >>>> dataIterator.foreach { case (key, value) => >>>> wrappedState.wrap(newStateMap.get(key)) >>>> val returned = mappingFunction(batchTime, key, Some(value), >>>> wrappedState) >>>> if (wrappedState.isRemoved) { >>>> newStateMap.remove(key) >>>> } else if (wrappedState.isUpdated || >>>> timeoutThresholdTime.isDefined) >>>> /* <--- problem is here */ { >>>> newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) >>>> } >>>> mappedData ++= returned >>>> } >>>> {code} >>>> >>>> In case the stream has a timeout set, but the state wasn't set at all, >>>> the >>>> "else-if" will still follow through because the timeout is defined but >>>> "wrappedState" is empty and wasn't set. >>>> >>>> If it is mandatory to update state for each entry of *mapWithState*, >>>> then >>>> this code should throw a better exception than "NoSuchElementException", >>>> which doesn't really saw anything to the developer. >>>> >>>> I haven't provided a fix myself because I'm not familiar with the spark >>>> implementation, but it seems to be there needs to either be an extra >>>> check >>>> if the state is set, or as previously stated a better exception message. >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> > >