[jira] [Commented] (SPARK-3876) Doing a RDD map/reduce within a DStream map fails with a high enough input rate

2015-01-22 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288851#comment-14288851
 ] 

Tathagata Das commented on SPARK-3876:
--

This seems to be a fundamentally incorrect computation to do with RDDs. You 
cannot refer to another RDD in the anotherRDD.map( ). If you have combine 
information between two RDDs, there are couple of ways of doing it. 

1. Use multiple RDD ops like join, cogroup, cartesian, etc. 
2. Collect and broadcast out the information of one RDD and then use the 
broadcast variable in the map (see Spark programming guide for broadcast 
variables). 

So this pattern is incorrect and therefore this JIRA is kinda invalid. I am 
closing this JIRA.

> Doing a RDD map/reduce within a DStream map fails with a high enough input 
> rate
> ---
>
> Key: SPARK-3876
> URL: https://issues.apache.org/jira/browse/SPARK-3876
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.2
>Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: 
> JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
>   public int getStrLength(String s){
>   return s.length();
>   }
> }
> The following code:
> List objList = Arrays.asList(new LengthGetter(), new 
> LengthGetter(), new LengthGetter());
>   
>   final JavaRDD objRdd = sc.parallelize(objList);
>   
>   
>   JavaInputDStream sentences = jssc.receiverStream(new 
> JavaRandomSentenceReceiver(frequency));
>   
>   sentences.map(new Function() {
>   @Override
>   public Integer call(final String input) throws 
> Exception {
>   Integer res = objRdd.map(new 
> Function() {
>   @Override
>   public Integer call(LengthGetter lg) 
> throws Exception {
>   return lg.getStrLength(input);
>   }
>   }).reduce(new Function2 Integer>() {
>   
>   @Override
>   public Integer call(Integer left, 
> Integer right) throws Exception {
>   return left + right;
>   }
>   });
>   
>   
>   return res;
>   }   
>   }).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception 
> failure in TID 3 on host localhost: java.lang.NullPointerException
> org.apache.spark.rdd.RDD.map(RDD.scala:270)
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
> org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is 
> set to 1sec and the frequencies for JavaRandomSentenceReceiver at which the 
> application fails are as low as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3876) Doing a RDD map/reduce within a DStream map fails with a high enough input rate

2014-10-12 Thread Andrei Filip (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169019#comment-14169019
 ] 

Andrei Filip commented on SPARK-3876:
-

In a nutshell, the use case aims to parallelize many operations performed on 
the same input, and aggregate the outputs. This suggestion was given to me in 
this discussion: 
http://chat.stackoverflow.com/rooms/61251/discussion-between-smola-and-andrei 
(towards the end)

To be honest, the more fundamental question is whether spark streaming is 
actually appropriate for this sort of use case.

> Doing a RDD map/reduce within a DStream map fails with a high enough input 
> rate
> ---
>
> Key: SPARK-3876
> URL: https://issues.apache.org/jira/browse/SPARK-3876
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.2
>Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: 
> JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
>   public int getStrLength(String s){
>   return s.length();
>   }
> }
> The following code:
> List objList = Arrays.asList(new LengthGetter(), new 
> LengthGetter(), new LengthGetter());
>   
>   final JavaRDD objRdd = sc.parallelize(objList);
>   
>   
>   JavaInputDStream sentences = jssc.receiverStream(new 
> JavaRandomSentenceReceiver(frequency));
>   
>   sentences.map(new Function() {
>   @Override
>   public Integer call(final String input) throws 
> Exception {
>   Integer res = objRdd.map(new 
> Function() {
>   @Override
>   public Integer call(LengthGetter lg) 
> throws Exception {
>   return lg.getStrLength(input);
>   }
>   }).reduce(new Function2 Integer>() {
>   
>   @Override
>   public Integer call(Integer left, 
> Integer right) throws Exception {
>   return left + right;
>   }
>   });
>   
>   
>   return res;
>   }   
>   }).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception 
> failure in TID 3 on host localhost: java.lang.NullPointerException
> org.apache.spark.rdd.RDD.map(RDD.scala:270)
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
> org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is 
> set to 1sec and the frequencies for JavaRandomSentenceReceiver at which the 
> application fails are as low as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3876) Doing a RDD map/reduce within a DStream map fails with a high enough input rate

2014-10-10 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14166832#comment-14166832
 ] 

Saisai Shao commented on SPARK-3876:


Hi [~afilip], is there any specific purpose you need to do rdd's map and reduce 
operation inside DStream's map function. I don't think this code can be worked 
and correctly executed in remote side, this code can be translated into RDD's 
transformation in each batch duration, like:

rdd.map { r => rdd1.map(c => op(c, r)).reduce(...) }.foreach(...)

since rdd's transformation should be divided into stages in driver side and be 
executed in executor side, remotely using rdd in closure may get error.

 If you want to use this RDD as a lookup table, you can build a local hashmap 
and broadcast to the remote side for looking up. So maybe this is not a bug.

> Doing a RDD map/reduce within a DStream map fails with a high enough input 
> rate
> ---
>
> Key: SPARK-3876
> URL: https://issues.apache.org/jira/browse/SPARK-3876
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.2
>Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: 
> JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
>   public int getStrLength(String s){
>   return s.length();
>   }
> }
> The following code:
> List objList = Arrays.asList(new LengthGetter(), new 
> LengthGetter(), new LengthGetter());
>   
>   final JavaRDD objRdd = sc.parallelize(objList);
>   
>   
>   JavaInputDStream sentences = jssc.receiverStream(new 
> JavaRandomSentenceReceiver(frequency));
>   
>   sentences.map(new Function() {
>   @Override
>   public Integer call(final String input) throws 
> Exception {
>   Integer res = objRdd.map(new 
> Function() {
>   @Override
>   public Integer call(LengthGetter lg) 
> throws Exception {
>   return lg.getStrLength(input);
>   }
>   }).reduce(new Function2 Integer>() {
>   
>   @Override
>   public Integer call(Integer left, 
> Integer right) throws Exception {
>   return left + right;
>   }
>   });
>   
>   
>   return res;
>   }   
>   }).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception 
> failure in TID 3 on host localhost: java.lang.NullPointerException
> org.apache.spark.rdd.RDD.map(RDD.scala:270)
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
> org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is 
> set to 1sec and the frequencies for JavaRandomSentenceReceiver at which the 
> application fails are as low as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org