[ https://issues.apache.org/jira/browse/SPARK-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167177#comment-15167177 ]
NITESH VERMA edited comment on SPARK-13488 at 2/25/16 1:10 PM: --------------------------------------------------------------- Hi Sean, well let me try to explain my problem. i am trying to use new API mapWithState sample code as below from the link https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java code snippet JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)); here i have introduced timeout call in the last as below wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000))); now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819 seems org.apache.spark.api.java.Optional is not available with 1.6. could you please guide me the reason or understanding that i am missing.i am new to spark. my maven deps for spark <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.6.0</version> </dependency> thanks was (Author: niteshverma....@gmail.com): Hi Sean, well let me try to explain my problem. i am trying to use new API mapWithState sample code as below from the link https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java code snippet JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)); here i have introduced timeout call in the last as below wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000))); now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819. could you please guide me the reason or understanding that i am missing.i am new to spark. thanks > PairDStreamFunctions.mapWithState fails in case timeout is set > java.util.NoSuchElementException: None.get > --------------------------------------------------------------------------------------------------------- > > Key: SPARK-13488 > URL: https://issues.apache.org/jira/browse/SPARK-13488 > Project: Spark > Issue Type: Bug > Affects Versions: 1.6.0 > Reporter: NITESH VERMA > > Using the new spark mapWithState API, I've encountered a issue when setting a > timeout for mapWithState > hi i am using mapwithstate api with timeout functionality and i am getting > below mentioned exception when timeout interval hits for ideal data > i am using example located here at this location > https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java > but some changes done: > 1. org.apache.spark.api.java.Optional class is not available in 1.6 so i am > using guava library for Optional > 2. i have used timeout fucnctionality > below is part of code : > > JavaPairDStream<String, Integer> wordsDstream = words.mapToPair( > new PairFunction<String, String, Integer>() { > @Override > public Tuple2<String, Integer> call(String s) { > return new Tuple2<>(s, 1); > } > }); > > > **// Update the cumulative count function > Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, > Integer>> mappingFunc = > new Function3<String, Optional<Integer>, State<Integer>, > Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> call(String word, Optional<Integer> > one, State<Integer> state) { > > > int sum = one.or(0) + (state.exists() ? state.get() : 0); > Tuple2<String, Integer> output = new Tuple2<>(word, sum); > state.update(sum); > return output; > } > }; > > > // DStream made of get cumulative counts that get updated in every batch > JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, > Integer>> stateDstream = > > wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new > Duration(1000) ));** > when i ran above mentioned code i was getting below mentioned exception > 16/02/25 11:41:33 ERROR Executor: Exception in task 0.0 in stage 157.0 > (TID 22) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:313) > at scala.None$.get(Option.scala:311) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 16/02/25 11:41:33 WARN TaskSetManager: Lost task 0.0 in stage 157.0 (TID > 22, localhost): java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:313) > at scala.None$.get(Option.scala:311) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > 16/02/25 11:41:33 ERROR TaskSetManager: Task 0 in stage 157.0 failed 1 > times; aborting job > 16/02/25 11:41:33 ERROR JobScheduler: Error running job streaming job > 1456380693000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:313) > at scala.None$.get(Option.scala:311) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222) > at > org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org