I am sorry for spam, I replied in wrong thread sleepy head :-( On Fri, Feb 5, 2016 at 1:15 AM, Sachin Aggarwal <different.sac...@gmail.com> wrote:
> > http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/ > > The default Tomcat server uses port 8080. You need to open that port on > your instance to make sure your Tomcat server is available on the Web (you > could also change the default port). In the AWS Management Console, select > Security Groups (left navigation bar), select the quick-start group, the > Inbound tab and add port 8080. Make sure you click “Add Rule” and then > “Apply Rule Changes”. > > On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal < > different.sac...@gmail.com> wrote: > >> i think we need to add port >> http://serverfault.com/questions/317903/aws-ec2-open-port-8080 >> >> >> do u remember doing anything like this earlier for aws 1 >> >> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yuva...@gmail.com> >> wrote: >> >>> Awesome. Thanks for the super fast reply. >>> >>> >>> On Thu, Feb 4, 2016, 21:16 Tathagata Das <tathagata.das1...@gmail.com> >>> wrote: >>> >>>> Shixiong has already opened the PR - >>>> https://github.com/apache/spark/pull/11081 >>>> >>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com> >>>> wrote: >>>> >>>>> Let me know if you do need a pull request for this, I can make that >>>>> happen (given someone does a vast PR to make sure I'm understanding this >>>>> problem right). >>>>> >>>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu < >>>>> shixi...@databricks.com> wrote: >>>>> >>>>>> Thanks for reporting it. I will take a look. >>>>>> >>>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I've been playing with the expiramental >>>>>>> PairDStreamFunctions.mapWithState >>>>>>> feature and I've seem to have stumbled across a bug, and was >>>>>>> wondering if >>>>>>> anyone else has been seeing this behavior. >>>>>>> >>>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass >>>>>>> this along >>>>>>> in case anyone else is experiencing such a failure or perhaps >>>>>>> someone has >>>>>>> insightful information if this is actually a bug: SPARK-13195 >>>>>>> <https://issues.apache.org/jira/browse/SPARK-13195> >>>>>>> >>>>>>> Using the new spark mapWithState API, I've encountered a bug when >>>>>>> setting a >>>>>>> timeout for mapWithState but no explicit state handling. >>>>>>> >>>>>>> h1. Steps to reproduce: >>>>>>> >>>>>>> 1. Create a method which conforms to the StateSpec signature, make >>>>>>> sure to >>>>>>> not update any state inside it using *state.update*. Simply create a >>>>>>> "pass >>>>>>> through" method, may even be empty. >>>>>>> 2. Create a StateSpec object with method from step 1, which >>>>>>> explicitly sets >>>>>>> a timeout using *StateSpec.timeout* method. >>>>>>> 3. Create a DStream pipeline that uses mapWithState with the given >>>>>>> StateSpec. >>>>>>> 4. Run code using spark-submit. You'll see that the method ends up >>>>>>> throwing >>>>>>> the following exception: >>>>>>> >>>>>>> {code} >>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>> Task 0 in >>>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in >>>>>>> stage >>>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is >>>>>>> not set >>>>>>> at org.apache.spark.streaming.StateImpl.get(State.scala:150) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >>>>>>> at >>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>>>>> at >>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >>>>>>> at >>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>>>> at >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> at >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> {code} >>>>>>> >>>>>>> h1. Sample code to reproduce the issue: >>>>>>> >>>>>>> {code:Title=MainObject} >>>>>>> import org.apache.spark.streaming._ >>>>>>> import org.apache.spark.{SparkConf, SparkContext} >>>>>>> /** >>>>>>> * Created by yuvali on 04/02/2016. >>>>>>> */ >>>>>>> object Program { >>>>>>> >>>>>>> def main(args: Array[String]): Unit = { >>>>>>> >>>>>>> val sc = new SparkConf().setAppName("mapWithState bug reproduce") >>>>>>> val sparkContext = new SparkContext(sc) >>>>>>> >>>>>>> val ssc = new StreamingContext(sparkContext, Seconds(4)) >>>>>>> val stateSpec = StateSpec.function(trackStateFunc >>>>>>> _).timeout(Seconds(60)) >>>>>>> >>>>>>> // Create a stream that generates 1000 lines per second >>>>>>> val stream = ssc.receiverStream(new DummySource(10)) >>>>>>> >>>>>>> // Split the lines into words, and create a paired (key-value) >>>>>>> dstream >>>>>>> val wordStream = stream.flatMap { >>>>>>> _.split(" ") >>>>>>> }.map(word => (word, 1)) >>>>>>> >>>>>>> // This represents the emitted stream from the trackStateFunc. >>>>>>> Since we >>>>>>> emit every input record with the updated value, >>>>>>> // this stream will contain the same # of records as the input >>>>>>> dstream. >>>>>>> val wordCountStateStream = wordStream.mapWithState(stateSpec) >>>>>>> wordCountStateStream.print() >>>>>>> >>>>>>> ssc.remember(Minutes(1)) // To make sure data is not deleted by >>>>>>> the time >>>>>>> we query it interactively >>>>>>> >>>>>>> // Don't forget to set checkpoint directory >>>>>>> ssc.checkpoint("") >>>>>>> ssc.start() >>>>>>> ssc.awaitTermination() >>>>>>> } >>>>>>> >>>>>>> def trackStateFunc(batchTime: Time, key: String, value: >>>>>>> Option[Int], >>>>>>> state: State[Long]): Option[(String, Long)] = { >>>>>>> val sum = value.getOrElse(0).toLong + >>>>>>> state.getOption.getOrElse(0L) >>>>>>> val output = (key, sum) >>>>>>> Some(output) >>>>>>> } >>>>>>> } >>>>>>> {code} >>>>>>> >>>>>>> {code:Title=DummySource} >>>>>>> >>>>>>> /** >>>>>>> * Created by yuvali on 04/02/2016. >>>>>>> */ >>>>>>> >>>>>>> import org.apache.spark.storage.StorageLevel >>>>>>> import scala.util.Random >>>>>>> import org.apache.spark.streaming.receiver._ >>>>>>> >>>>>>> class DummySource(ratePerSec: Int) extends >>>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { >>>>>>> >>>>>>> def onStart() { >>>>>>> // Start the thread that receives data over a connection >>>>>>> new Thread("Dummy Source") { >>>>>>> override def run() { receive() } >>>>>>> }.start() >>>>>>> } >>>>>>> >>>>>>> def onStop() { >>>>>>> // There is nothing much to do as the thread calling receive() >>>>>>> // is designed to stop by itself isStopped() returns false >>>>>>> } >>>>>>> >>>>>>> /** Create a socket connection and receive data until receiver is >>>>>>> stopped >>>>>>> */ >>>>>>> private def receive() { >>>>>>> while(!isStopped()) { >>>>>>> store("I am a dummy source " + Random.nextInt(10)) >>>>>>> Thread.sleep((1000.toDouble / ratePerSec).toInt) >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> {code} >>>>>>> >>>>>>> The given issue resides in the following >>>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in >>>>>>> the >>>>>>> following code block: >>>>>>> >>>>>>> {code} >>>>>>> dataIterator.foreach { case (key, value) => >>>>>>> wrappedState.wrap(newStateMap.get(key)) >>>>>>> val returned = mappingFunction(batchTime, key, Some(value), >>>>>>> wrappedState) >>>>>>> if (wrappedState.isRemoved) { >>>>>>> newStateMap.remove(key) >>>>>>> } else if (wrappedState.isUpdated || >>>>>>> timeoutThresholdTime.isDefined) >>>>>>> /* <--- problem is here */ { >>>>>>> newStateMap.put(key, wrappedState.get(), >>>>>>> batchTime.milliseconds) >>>>>>> } >>>>>>> mappedData ++= returned >>>>>>> } >>>>>>> {code} >>>>>>> >>>>>>> In case the stream has a timeout set, but the state wasn't set at >>>>>>> all, the >>>>>>> "else-if" will still follow through because the timeout is defined >>>>>>> but >>>>>>> "wrappedState" is empty and wasn't set. >>>>>>> >>>>>>> If it is mandatory to update state for each entry of *mapWithState*, >>>>>>> then >>>>>>> this code should throw a better exception than >>>>>>> "NoSuchElementException", >>>>>>> which doesn't really saw anything to the developer. >>>>>>> >>>>>>> I haven't provided a fix myself because I'm not familiar with the >>>>>>> spark >>>>>>> implementation, but it seems to be there needs to either be an extra >>>>>>> check >>>>>>> if the state is set, or as previously stated a better exception >>>>>>> message. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Yuval Itzchakov. >>>>> >>>> >>>> >> >> >> -- >> >> Thanks & Regards >> >> Sachin Aggarwal >> 7760502772 >> > > > > -- > > Thanks & Regards > > Sachin Aggarwal > 7760502772 > -- Thanks & Regards Sachin Aggarwal 7760502772