I am sorry for spam, I replied in wrong thread sleepy head :-(

On Fri, Feb 5, 2016 at 1:15 AM, Sachin Aggarwal <different.sac...@gmail.com>
wrote:

>
> http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/
>
> The default Tomcat server uses port 8080. You need to open that port on
> your instance to make sure your Tomcat server is available on the Web (you
> could also change the default port). In the AWS Management Console, select
> Security Groups (left navigation bar), select the quick-start group, the
> Inbound tab and add port 8080. Make sure you click “Add Rule” and then
> “Apply Rule Changes”.
>
> On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal <
> different.sac...@gmail.com> wrote:
>
>> i think we need to add port
>> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>>
>>
>> do u remember doing anything like this earlier for aws 1
>>
>> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yuva...@gmail.com>
>> wrote:
>>
>>> Awesome. Thanks for the super fast reply.
>>>
>>>
>>> On Thu, Feb 4, 2016, 21:16 Tathagata Das <tathagata.das1...@gmail.com>
>>> wrote:
>>>
>>>> Shixiong has already opened the PR -
>>>> https://github.com/apache/spark/pull/11081
>>>>
>>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com>
>>>> wrote:
>>>>
>>>>> Let me know if you do need a pull request for this, I can make that
>>>>> happen (given someone does a vast PR to make sure I'm understanding this
>>>>> problem right).
>>>>>
>>>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>>>> shixi...@databricks.com> wrote:
>>>>>
>>>>>> Thanks for reporting it. I will take a look.
>>>>>>
>>>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I've been playing with the expiramental
>>>>>>> PairDStreamFunctions.mapWithState
>>>>>>> feature and I've seem to have stumbled across a bug, and was
>>>>>>> wondering if
>>>>>>> anyone else has been seeing this behavior.
>>>>>>>
>>>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass
>>>>>>> this along
>>>>>>> in case anyone else is experiencing such a failure or perhaps
>>>>>>> someone has
>>>>>>> insightful information if this is actually a bug:  SPARK-13195
>>>>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>>>>
>>>>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>>>>> setting a
>>>>>>> timeout for mapWithState but no explicit state handling.
>>>>>>>
>>>>>>> h1. Steps to reproduce:
>>>>>>>
>>>>>>> 1. Create a method which conforms to the StateSpec signature, make
>>>>>>> sure to
>>>>>>> not update any state inside it using *state.update*. Simply create a
>>>>>>> "pass
>>>>>>> through" method, may even be empty.
>>>>>>> 2. Create a StateSpec object with method from step 1, which
>>>>>>> explicitly sets
>>>>>>> a timeout using *StateSpec.timeout* method.
>>>>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>>>>> StateSpec.
>>>>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>>>>> throwing
>>>>>>> the following exception:
>>>>>>>
>>>>>>> {code}
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 0 in
>>>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>>>>>>> stage
>>>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is
>>>>>>> not set
>>>>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> {code}
>>>>>>>
>>>>>>> h1. Sample code to reproduce the issue:
>>>>>>>
>>>>>>> {code:Title=MainObject}
>>>>>>> import org.apache.spark.streaming._
>>>>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>>>>> /**
>>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>>   */
>>>>>>> object Program {
>>>>>>>
>>>>>>>   def main(args: Array[String]): Unit = {
>>>>>>>
>>>>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>>>>     val sparkContext = new SparkContext(sc)
>>>>>>>
>>>>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>>>>> _).timeout(Seconds(60))
>>>>>>>
>>>>>>>     // Create a stream that generates 1000 lines per second
>>>>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>>>>
>>>>>>>     // Split the lines into words, and create a paired (key-value)
>>>>>>> dstream
>>>>>>>     val wordStream = stream.flatMap {
>>>>>>>       _.split(" ")
>>>>>>>     }.map(word => (word, 1))
>>>>>>>
>>>>>>>     // This represents the emitted stream from the trackStateFunc.
>>>>>>> Since we
>>>>>>> emit every input record with the updated value,
>>>>>>>     // this stream will contain the same # of records as the input
>>>>>>> dstream.
>>>>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>>>>     wordCountStateStream.print()
>>>>>>>
>>>>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by
>>>>>>> the time
>>>>>>> we query it interactively
>>>>>>>
>>>>>>>     // Don't forget to set checkpoint directory
>>>>>>>     ssc.checkpoint("")
>>>>>>>     ssc.start()
>>>>>>>     ssc.awaitTermination()
>>>>>>>   }
>>>>>>>
>>>>>>>   def trackStateFunc(batchTime: Time, key: String, value:
>>>>>>> Option[Int],
>>>>>>> state: State[Long]): Option[(String, Long)] = {
>>>>>>>     val sum = value.getOrElse(0).toLong +
>>>>>>> state.getOption.getOrElse(0L)
>>>>>>>     val output = (key, sum)
>>>>>>>     Some(output)
>>>>>>>   }
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> {code:Title=DummySource}
>>>>>>>
>>>>>>> /**
>>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>>   */
>>>>>>>
>>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>>> import scala.util.Random
>>>>>>> import org.apache.spark.streaming.receiver._
>>>>>>>
>>>>>>> class DummySource(ratePerSec: Int) extends
>>>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>>>>
>>>>>>>   def onStart() {
>>>>>>>     // Start the thread that receives data over a connection
>>>>>>>     new Thread("Dummy Source") {
>>>>>>>       override def run() { receive() }
>>>>>>>     }.start()
>>>>>>>   }
>>>>>>>
>>>>>>>   def onStop() {
>>>>>>>     // There is nothing much to do as the thread calling receive()
>>>>>>>     // is designed to stop by itself isStopped() returns false
>>>>>>>   }
>>>>>>>
>>>>>>>   /** Create a socket connection and receive data until receiver is
>>>>>>> stopped
>>>>>>> */
>>>>>>>   private def receive() {
>>>>>>>     while(!isStopped()) {
>>>>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> The given issue resides in the following
>>>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in
>>>>>>> the
>>>>>>> following code block:
>>>>>>>
>>>>>>> {code}
>>>>>>> dataIterator.foreach { case (key, value) =>
>>>>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>>>>> wrappedState)
>>>>>>>       if (wrappedState.isRemoved) {
>>>>>>>         newStateMap.remove(key)
>>>>>>>       } else if (wrappedState.isUpdated ||
>>>>>>> timeoutThresholdTime.isDefined)
>>>>>>> /* <--- problem is here */ {
>>>>>>>         newStateMap.put(key, wrappedState.get(),
>>>>>>> batchTime.milliseconds)
>>>>>>>       }
>>>>>>>       mappedData ++= returned
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> In case the stream has a timeout set, but the state wasn't set at
>>>>>>> all, the
>>>>>>> "else-if" will still follow through because the timeout is defined
>>>>>>> but
>>>>>>> "wrappedState" is empty and wasn't set.
>>>>>>>
>>>>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>>>>> then
>>>>>>> this code should throw a better exception than
>>>>>>> "NoSuchElementException",
>>>>>>> which doesn't really saw anything to the developer.
>>>>>>>
>>>>>>> I haven't provided a fix myself because I'm not familiar with the
>>>>>>> spark
>>>>>>> implementation, but it seems to be there needs to either be an extra
>>>>>>> check
>>>>>>> if the state is set, or as previously stated a better exception
>>>>>>> message.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>
>>>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sachin Aggarwal
>> 7760502772
>>
>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>



-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Reply via email to