Shixiong has already opened the PR - https://github.com/apache/spark/pull/11081
On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com> wrote: > Let me know if you do need a pull request for this, I can make that happen > (given someone does a vast PR to make sure I'm understanding this problem > right). > > On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Thanks for reporting it. I will take a look. >> >> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> >> wrote: >> >>> Hi, >>> I've been playing with the expiramental PairDStreamFunctions.mapWithState >>> feature and I've seem to have stumbled across a bug, and was wondering if >>> anyone else has been seeing this behavior. >>> >>> I've opened up an issue in the Spark JIRA, I simply want to pass this >>> along >>> in case anyone else is experiencing such a failure or perhaps someone has >>> insightful information if this is actually a bug: SPARK-13195 >>> <https://issues.apache.org/jira/browse/SPARK-13195> >>> >>> Using the new spark mapWithState API, I've encountered a bug when >>> setting a >>> timeout for mapWithState but no explicit state handling. >>> >>> h1. Steps to reproduce: >>> >>> 1. Create a method which conforms to the StateSpec signature, make sure >>> to >>> not update any state inside it using *state.update*. Simply create a >>> "pass >>> through" method, may even be empty. >>> 2. Create a StateSpec object with method from step 1, which explicitly >>> sets >>> a timeout using *StateSpec.timeout* method. >>> 3. Create a DStream pipeline that uses mapWithState with the given >>> StateSpec. >>> 4. Run code using spark-submit. You'll see that the method ends up >>> throwing >>> the following exception: >>> >>> {code} >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in >>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage >>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set >>> at org.apache.spark.streaming.StateImpl.get(State.scala:150) >>> at >>> >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61) >>> at >>> >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at >>> >>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >>> at >>> >>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>> at >>> >>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> {code} >>> >>> h1. Sample code to reproduce the issue: >>> >>> {code:Title=MainObject} >>> import org.apache.spark.streaming._ >>> import org.apache.spark.{SparkConf, SparkContext} >>> /** >>> * Created by yuvali on 04/02/2016. >>> */ >>> object Program { >>> >>> def main(args: Array[String]): Unit = { >>> >>> val sc = new SparkConf().setAppName("mapWithState bug reproduce") >>> val sparkContext = new SparkContext(sc) >>> >>> val ssc = new StreamingContext(sparkContext, Seconds(4)) >>> val stateSpec = StateSpec.function(trackStateFunc >>> _).timeout(Seconds(60)) >>> >>> // Create a stream that generates 1000 lines per second >>> val stream = ssc.receiverStream(new DummySource(10)) >>> >>> // Split the lines into words, and create a paired (key-value) >>> dstream >>> val wordStream = stream.flatMap { >>> _.split(" ") >>> }.map(word => (word, 1)) >>> >>> // This represents the emitted stream from the trackStateFunc. Since >>> we >>> emit every input record with the updated value, >>> // this stream will contain the same # of records as the input >>> dstream. >>> val wordCountStateStream = wordStream.mapWithState(stateSpec) >>> wordCountStateStream.print() >>> >>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the >>> time >>> we query it interactively >>> >>> // Don't forget to set checkpoint directory >>> ssc.checkpoint("") >>> ssc.start() >>> ssc.awaitTermination() >>> } >>> >>> def trackStateFunc(batchTime: Time, key: String, value: Option[Int], >>> state: State[Long]): Option[(String, Long)] = { >>> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) >>> val output = (key, sum) >>> Some(output) >>> } >>> } >>> {code} >>> >>> {code:Title=DummySource} >>> >>> /** >>> * Created by yuvali on 04/02/2016. >>> */ >>> >>> import org.apache.spark.storage.StorageLevel >>> import scala.util.Random >>> import org.apache.spark.streaming.receiver._ >>> >>> class DummySource(ratePerSec: Int) extends >>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { >>> >>> def onStart() { >>> // Start the thread that receives data over a connection >>> new Thread("Dummy Source") { >>> override def run() { receive() } >>> }.start() >>> } >>> >>> def onStop() { >>> // There is nothing much to do as the thread calling receive() >>> // is designed to stop by itself isStopped() returns false >>> } >>> >>> /** Create a socket connection and receive data until receiver is >>> stopped >>> */ >>> private def receive() { >>> while(!isStopped()) { >>> store("I am a dummy source " + Random.nextInt(10)) >>> Thread.sleep((1000.toDouble / ratePerSec).toInt) >>> } >>> } >>> } >>> {code} >>> >>> The given issue resides in the following >>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the >>> following code block: >>> >>> {code} >>> dataIterator.foreach { case (key, value) => >>> wrappedState.wrap(newStateMap.get(key)) >>> val returned = mappingFunction(batchTime, key, Some(value), >>> wrappedState) >>> if (wrappedState.isRemoved) { >>> newStateMap.remove(key) >>> } else if (wrappedState.isUpdated || >>> timeoutThresholdTime.isDefined) >>> /* <--- problem is here */ { >>> newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) >>> } >>> mappedData ++= returned >>> } >>> {code} >>> >>> In case the stream has a timeout set, but the state wasn't set at all, >>> the >>> "else-if" will still follow through because the timeout is defined but >>> "wrappedState" is empty and wasn't set. >>> >>> If it is mandatory to update state for each entry of *mapWithState*, then >>> this code should throw a better exception than "NoSuchElementException", >>> which doesn't really saw anything to the developer. >>> >>> I haven't provided a fix myself because I'm not familiar with the spark >>> implementation, but it seems to be there needs to either be an extra >>> check >>> if the state is set, or as previously stated a better exception message. >>> >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > > > -- > Best Regards, > Yuval Itzchakov. >