i think we need to add port
http://serverfault.com/questions/317903/aws-ec2-open-port-8080


do u remember doing anything like this earlier for aws 1

On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yuva...@gmail.com> wrote:

> Awesome. Thanks for the super fast reply.
>
>
> On Thu, Feb 4, 2016, 21:16 Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Shixiong has already opened the PR -
>> https://github.com/apache/spark/pull/11081
>>
>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yuva...@gmail.com>
>> wrote:
>>
>>> Let me know if you do need a pull request for this, I can make that
>>> happen (given someone does a vast PR to make sure I'm understanding this
>>> problem right).
>>>
>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Thanks for reporting it. I will take a look.
>>>>
>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yuva...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I've been playing with the expiramental
>>>>> PairDStreamFunctions.mapWithState
>>>>> feature and I've seem to have stumbled across a bug, and was wondering
>>>>> if
>>>>> anyone else has been seeing this behavior.
>>>>>
>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>>>> along
>>>>> in case anyone else is experiencing such a failure or perhaps someone
>>>>> has
>>>>> insightful information if this is actually a bug:  SPARK-13195
>>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>>
>>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>>> setting a
>>>>> timeout for mapWithState but no explicit state handling.
>>>>>
>>>>> h1. Steps to reproduce:
>>>>>
>>>>> 1. Create a method which conforms to the StateSpec signature, make
>>>>> sure to
>>>>> not update any state inside it using *state.update*. Simply create a
>>>>> "pass
>>>>> through" method, may even be empty.
>>>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>>>> sets
>>>>> a timeout using *StateSpec.timeout* method.
>>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>>> StateSpec.
>>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>>> throwing
>>>>> the following exception:
>>>>>
>>>>> {code}
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 0 in
>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not
>>>>> set
>>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>         at
>>>>>
>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> {code}
>>>>>
>>>>> h1. Sample code to reproduce the issue:
>>>>>
>>>>> {code:Title=MainObject}
>>>>> import org.apache.spark.streaming._
>>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>>> /**
>>>>>   * Created by yuvali on 04/02/2016.
>>>>>   */
>>>>> object Program {
>>>>>
>>>>>   def main(args: Array[String]): Unit = {
>>>>>
>>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>>     val sparkContext = new SparkContext(sc)
>>>>>
>>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>>> _).timeout(Seconds(60))
>>>>>
>>>>>     // Create a stream that generates 1000 lines per second
>>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>>
>>>>>     // Split the lines into words, and create a paired (key-value)
>>>>> dstream
>>>>>     val wordStream = stream.flatMap {
>>>>>       _.split(" ")
>>>>>     }.map(word => (word, 1))
>>>>>
>>>>>     // This represents the emitted stream from the trackStateFunc.
>>>>> Since we
>>>>> emit every input record with the updated value,
>>>>>     // this stream will contain the same # of records as the input
>>>>> dstream.
>>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>>     wordCountStateStream.print()
>>>>>
>>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by
>>>>> the time
>>>>> we query it interactively
>>>>>
>>>>>     // Don't forget to set checkpoint directory
>>>>>     ssc.checkpoint("")
>>>>>     ssc.start()
>>>>>     ssc.awaitTermination()
>>>>>   }
>>>>>
>>>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>>>> state: State[Long]): Option[(String, Long)] = {
>>>>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>>>>     val output = (key, sum)
>>>>>     Some(output)
>>>>>   }
>>>>> }
>>>>> {code}
>>>>>
>>>>> {code:Title=DummySource}
>>>>>
>>>>> /**
>>>>>   * Created by yuvali on 04/02/2016.
>>>>>   */
>>>>>
>>>>> import org.apache.spark.storage.StorageLevel
>>>>> import scala.util.Random
>>>>> import org.apache.spark.streaming.receiver._
>>>>>
>>>>> class DummySource(ratePerSec: Int) extends
>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>>
>>>>>   def onStart() {
>>>>>     // Start the thread that receives data over a connection
>>>>>     new Thread("Dummy Source") {
>>>>>       override def run() { receive() }
>>>>>     }.start()
>>>>>   }
>>>>>
>>>>>   def onStop() {
>>>>>     // There is nothing much to do as the thread calling receive()
>>>>>     // is designed to stop by itself isStopped() returns false
>>>>>   }
>>>>>
>>>>>   /** Create a socket connection and receive data until receiver is
>>>>> stopped
>>>>> */
>>>>>   private def receive() {
>>>>>     while(!isStopped()) {
>>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>>     }
>>>>>   }
>>>>> }
>>>>> {code}
>>>>>
>>>>> The given issue resides in the following
>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>>>> following code block:
>>>>>
>>>>> {code}
>>>>> dataIterator.foreach { case (key, value) =>
>>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>>> wrappedState)
>>>>>       if (wrappedState.isRemoved) {
>>>>>         newStateMap.remove(key)
>>>>>       } else if (wrappedState.isUpdated ||
>>>>> timeoutThresholdTime.isDefined)
>>>>> /* <--- problem is here */ {
>>>>>         newStateMap.put(key, wrappedState.get(),
>>>>> batchTime.milliseconds)
>>>>>       }
>>>>>       mappedData ++= returned
>>>>> }
>>>>> {code}
>>>>>
>>>>> In case the stream has a timeout set, but the state wasn't set at all,
>>>>> the
>>>>> "else-if" will still follow through because the timeout is defined but
>>>>> "wrappedState" is empty and wasn't set.
>>>>>
>>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>>> then
>>>>> this code should throw a better exception than
>>>>> "NoSuchElementException",
>>>>> which doesn't really saw anything to the developer.
>>>>>
>>>>> I haven't provided a fix myself because I'm not familiar with the spark
>>>>> implementation, but it seems to be there needs to either be an extra
>>>>> check
>>>>> if the state is set, or as previously stated a better exception
>>>>> message.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Reply via email to