[ 
https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169019#comment-14169019
 ] 

Andrei Filip commented on SPARK-3876:
-------------------------------------

In a nutshell, the use case aims to parallelize many operations performed on 
the same input, and aggregate the outputs. This suggestion was given to me in 
this discussion: 
http://chat.stackoverflow.com/rooms/61251/discussion-between-smola-and-andrei 
(towards the end)

To be honest, the more fundamental question is whether spark streaming is 
actually appropriate for this sort of use case.

> Doing a RDD map/reduce within a DStream map fails with a high enough input 
> rate
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-3876
>                 URL: https://issues.apache.org/jira/browse/SPARK-3876
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.2
>            Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: 
> JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
>       public int getStrLength(String s){
>               return s.length();
>       }
> }
> The following code:
> List<LengthGetter> objList = Arrays.asList(new LengthGetter(), new 
> LengthGetter(), new LengthGetter());
>               
>               final JavaRDD<LengthGetter> objRdd = sc.parallelize(objList);
>               
>               
>               JavaInputDStream<String> sentences = jssc.receiverStream(new 
> JavaRandomSentenceReceiver(frequency));
>               
>               sentences.map(new Function<String, Integer>() {
>                       @Override
>                       public Integer call(final String input) throws 
> Exception {
>                               Integer res = objRdd.map(new 
> Function<LengthGetter, Integer>() {
>                                       @Override
>                                       public Integer call(LengthGetter lg) 
> throws Exception {
>                                               return lg.getStrLength(input);
>                                       }
>                               }).reduce(new Function2<Integer, Integer, 
> Integer>() {
>                                       
>                                       @Override
>                                       public Integer call(Integer left, 
> Integer right) throws Exception {
>                                               return left + right;
>                                       }
>                               });
>                               
>                               
>                               return res;
>                       }                       
>               }).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception 
> failure in TID 3 on host localhost: java.lang.NullPointerException
>         org.apache.spark.rdd.RDD.map(RDD.scala:270)
>         org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
>         org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is 
> set to 1sec and the frequencies for JavaRandomSentenceReceiver at which the 
> application fails are as low as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to