i think we need to add port http://serverfault.com/questions/317903/aws-ec2-open-port-8080
do u remember doing anything like this earlier for aws 1 On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yuva...@gmail.com> wrote: > Awesome. Thanks for the super fast reply. > > > On Thu, Feb 4, 2016, 21:16 Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> Shixiong has already opened the PR - >> https://github.com/apache/spark/pull/11081 >> >> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com> >> wrote: >> >>> Let me know if you do need a pull request for this, I can make that >>> happen (given someone does a vast PR to make sure I'm understanding this >>> problem right). >>> >>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Thanks for reporting it. I will take a look. >>>> >>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I've been playing with the expiramental >>>>> PairDStreamFunctions.mapWithState >>>>> feature and I've seem to have stumbled across a bug, and was wondering >>>>> if >>>>> anyone else has been seeing this behavior. >>>>> >>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this >>>>> along >>>>> in case anyone else is experiencing such a failure or perhaps someone >>>>> has >>>>> insightful information if this is actually a bug: SPARK-13195 >>>>> <https://issues.apache.org/jira/browse/SPARK-13195> >>>>> >>>>> Using the new spark mapWithState API, I've encountered a bug when >>>>> setting a >>>>> timeout for mapWithState but no explicit state handling. >>>>> >>>>> h1. Steps to reproduce: >>>>> >>>>> 1. Create a method which conforms to the StateSpec signature, make >>>>> sure to >>>>> not update any state inside it using *state.update*. Simply create a >>>>> "pass >>>>> through" method, may even be empty. >>>>> 2. Create a StateSpec object with method from step 1, which explicitly >>>>> sets >>>>> a timeout using *StateSpec.timeout* method. >>>>> 3. Create a DStream pipeline that uses mapWithState with the given >>>>> StateSpec. >>>>> 4. Run code using spark-submit. You'll see that the method ends up >>>>> throwing >>>>> the following exception: >>>>> >>>>> {code} >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 0 in >>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage >>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not >>>>> set >>>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150) >>>>> at >>>>> >>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61) >>>>> at >>>>> >>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> at >>>>> >>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >>>>> at >>>>> >>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>>>> at >>>>> >>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>>> at >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>>>> at >>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>> at >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> {code} >>>>> >>>>> h1. Sample code to reproduce the issue: >>>>> >>>>> {code:Title=MainObject} >>>>> import org.apache.spark.streaming._ >>>>> import org.apache.spark.{SparkConf, SparkContext} >>>>> /** >>>>> * Created by yuvali on 04/02/2016. >>>>> */ >>>>> object Program { >>>>> >>>>> def main(args: Array[String]): Unit = { >>>>> >>>>> val sc = new SparkConf().setAppName("mapWithState bug reproduce") >>>>> val sparkContext = new SparkContext(sc) >>>>> >>>>> val ssc = new StreamingContext(sparkContext, Seconds(4)) >>>>> val stateSpec = StateSpec.function(trackStateFunc >>>>> _).timeout(Seconds(60)) >>>>> >>>>> // Create a stream that generates 1000 lines per second >>>>> val stream = ssc.receiverStream(new DummySource(10)) >>>>> >>>>> // Split the lines into words, and create a paired (key-value) >>>>> dstream >>>>> val wordStream = stream.flatMap { >>>>> _.split(" ") >>>>> }.map(word => (word, 1)) >>>>> >>>>> // This represents the emitted stream from the trackStateFunc. >>>>> Since we >>>>> emit every input record with the updated value, >>>>> // this stream will contain the same # of records as the input >>>>> dstream. >>>>> val wordCountStateStream = wordStream.mapWithState(stateSpec) >>>>> wordCountStateStream.print() >>>>> >>>>> ssc.remember(Minutes(1)) // To make sure data is not deleted by >>>>> the time >>>>> we query it interactively >>>>> >>>>> // Don't forget to set checkpoint directory >>>>> ssc.checkpoint("") >>>>> ssc.start() >>>>> ssc.awaitTermination() >>>>> } >>>>> >>>>> def trackStateFunc(batchTime: Time, key: String, value: Option[Int], >>>>> state: State[Long]): Option[(String, Long)] = { >>>>> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) >>>>> val output = (key, sum) >>>>> Some(output) >>>>> } >>>>> } >>>>> {code} >>>>> >>>>> {code:Title=DummySource} >>>>> >>>>> /** >>>>> * Created by yuvali on 04/02/2016. >>>>> */ >>>>> >>>>> import org.apache.spark.storage.StorageLevel >>>>> import scala.util.Random >>>>> import org.apache.spark.streaming.receiver._ >>>>> >>>>> class DummySource(ratePerSec: Int) extends >>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { >>>>> >>>>> def onStart() { >>>>> // Start the thread that receives data over a connection >>>>> new Thread("Dummy Source") { >>>>> override def run() { receive() } >>>>> }.start() >>>>> } >>>>> >>>>> def onStop() { >>>>> // There is nothing much to do as the thread calling receive() >>>>> // is designed to stop by itself isStopped() returns false >>>>> } >>>>> >>>>> /** Create a socket connection and receive data until receiver is >>>>> stopped >>>>> */ >>>>> private def receive() { >>>>> while(!isStopped()) { >>>>> store("I am a dummy source " + Random.nextInt(10)) >>>>> Thread.sleep((1000.toDouble / ratePerSec).toInt) >>>>> } >>>>> } >>>>> } >>>>> {code} >>>>> >>>>> The given issue resides in the following >>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the >>>>> following code block: >>>>> >>>>> {code} >>>>> dataIterator.foreach { case (key, value) => >>>>> wrappedState.wrap(newStateMap.get(key)) >>>>> val returned = mappingFunction(batchTime, key, Some(value), >>>>> wrappedState) >>>>> if (wrappedState.isRemoved) { >>>>> newStateMap.remove(key) >>>>> } else if (wrappedState.isUpdated || >>>>> timeoutThresholdTime.isDefined) >>>>> /* <--- problem is here */ { >>>>> newStateMap.put(key, wrappedState.get(), >>>>> batchTime.milliseconds) >>>>> } >>>>> mappedData ++= returned >>>>> } >>>>> {code} >>>>> >>>>> In case the stream has a timeout set, but the state wasn't set at all, >>>>> the >>>>> "else-if" will still follow through because the timeout is defined but >>>>> "wrappedState" is empty and wasn't set. >>>>> >>>>> If it is mandatory to update state for each entry of *mapWithState*, >>>>> then >>>>> this code should throw a better exception than >>>>> "NoSuchElementException", >>>>> which doesn't really saw anything to the developer. >>>>> >>>>> I haven't provided a fix myself because I'm not familiar with the spark >>>>> implementation, but it seems to be there needs to either be an extra >>>>> check >>>>> if the state is set, or as previously stated a better exception >>>>> message. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Yuval Itzchakov. >>> >> >> -- Thanks & Regards Sachin Aggarwal 7760502772