New JavaRDD Inside JavaPairDStream

2015-09-11 Thread Rachana Srivastava
Hello all,

Can we invoke JavaRDD while processing stream from Kafka for example.  
Following code is throwing some serialization exception.  Not sure if this is 
feasible.

  JavaStreamingContext jssc = new JavaStreamingContext(jsc, 
Durations.seconds(5));
JavaPairReceiverInputDStream messages = 
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream lines = messages.map(new Function, String>() {
  public String call(Tuple2 tuple2) { return tuple2._2();
  }
});
JavaPairDStream wordCounts = lines.mapToPair( new 
PairFunction() {
public Tuple2 call(String urlString) {
String propertiesFile = 
"/home/cloudera/Desktop/sample/input/featurelist.properties";
JavaRDD propertiesFileRDD = 
jsc.textFile(propertiesFile);
  JavaPairRDD featureKeyClassPair = 
propertiesFileRDD.mapToPair(
  new PairFunction() {
  public Tuple2 
call(String property) {
return new 
Tuple2(property.split("=")[0], property.split("=")[1]);
  }
 });
featureKeyClassPair.count();
  return new Tuple2(urlString,  featureScore);
}
  });



Re: New JavaRDD Inside JavaPairDStream

2015-09-11 Thread Cody Koeninger
No, in general you can't make new RDDs in code running on the executors.

It looks like your properties file is a constant, why not process it at the
beginning of the job and broadcast the result?

On Fri, Sep 11, 2015 at 2:09 PM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
>
>
> Can we invoke JavaRDD while processing stream from Kafka for example.
> Following code is throwing some serialization exception.  Not sure if this
> is feasible.
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(jsc, Durations.
> *seconds*(5));
>
> JavaPairReceiverInputDStream messages = KafkaUtils.
> *createStream*(jssc, zkQuorum, group, topicMap);
>
> JavaDStream lines = messages.map(*new* *Function String>, String>()* {
>
>   *public* String call(Tuple2 tuple2) { *return*
> tuple2._2();
>
>   }
>
> });
>
> JavaPairDStream wordCounts = lines.mapToPair( *new* 
> *PairFunction String, String>()* {
>
> *public* Tuple2 call(String urlString) {
>
> String propertiesFile =
> "/home/cloudera/Desktop/sample/input/featurelist.properties";
>
> JavaRDD propertiesFileRDD = jsc.textFile(
> propertiesFile);
>
>   JavaPairRDD featureKeyClassPair
> = propertiesFileRDD.mapToPair(
>
>   *new* *PairFunction String>()* {
>
>   *public* Tuple2 String> call(String property) {
>
> *return* *new**
> Tuple2(**property**.split(**"="**)[0], **property**.split(**"="**)[1])*;
>
>   }
>
>  });
>
> featureKeyClassPair.count();
>
>   *return* *new* Tuple2(urlString,  featureScore);
>
> }
>
>   });
>
>
>