You could have your receiver send a "magic value" when it is done. I discuss 
this Spark Streaming pattern in my presentation "Spark Gotchas and 
Anti-Patterns". In the PDF version, it's slides 
34-36.http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language

YouTube version cued to that place: 
http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s   
      From: Hari Polisetty <hpoli...@icloud.com>
 To: Tathagata Das <t...@databricks.com> 
Cc: user <user@spark.apache.org> 
 Sent: Monday, April 6, 2015 2:02 PM
 Subject: Re: How to restrict foreach on a streaming RDD only once upon 
receiver completion
   
Yes, I’m using updateStateByKey and it works. But then I need to perform 
further computation on this Stateful RDD (see code snippet below). I perform 
forEach on the final RDD and get the top 10 records. I just don’t want the 
foreach to be performed every time a new batch is received. Only when the 
receiver is done fetching all the records.
My requirements are to programmatically invoke the E.S query (it varies by 
usecase) , get all the records and apply certain transformations and get the 
top 10 results based on certain criteria back into the driver program for 
further processing. I’m able to apply the transformations on the batches of 
records fetched from E.S  using streaming. So, I don’t need to wait for all the 
records to be fetched. The RDD transformations are happening all the time and 
the top k results are getting updated constantly until all the records are 
fetched by the receiver. Is there any drawback with this approach?
Can you give more pointers on what you mean by creating a custom RDD that reads 
from ElasticSearch? 
Here is the relevant portion of my Spark streaming code:
 //Create a custom streaming receiver to query for relevant data from E.S 
JavaReceiverInputDStream<String> jsonStrings = ssc.receiverStream( new 
ElasticSearchResponseReceiver(query…….));
 //Apply JSON Paths to extract specific value(s) from each record 
JavaDStream<String> fieldVariations = jsonStrings.flatMap(new 
FlatMapFunction<String, String>() { private static final long serialVersionUID 
= 4652373457519888848L;
 @Override public Iterable<String> call(String jsonString) { List<String> r = 
JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; }
 });
 //Perform a stateful map reduce on each variation JavaPairDStream<String, 
Integer> fieldVariationCounts = fieldVariations.mapToPair( new 
PairFunction<String, String, Integer>() { private static final long 
serialVersionUID = -1241276515559408238L;
 @Override public Tuple2<String, Integer> call(String s) { return new 
Tuple2<String, Integer>(s, 1); } }).updateStateByKey(new 
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> () { private 
static final long serialVersionUID = 7598681835161199865L;
 public Optional<Integer> call(List<Integer> nums, Optional<Integer> current) { 
Integer sum =  current.or((int) 0L); return (Optional<Integer>) Optional.of(sum 
+ nums.size()); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
private static final long serialVersionUID = -5906059838295609562L;
 @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } });
 //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can 
sort on frequencies JavaPairDStream<Integer, String> swappedPair = 
fieldVariationCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, 
Integer, String>() { private static final long serialVersionUID = 
-5889774695187619957L;
 @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> item) 
throws Exception { return item.swap(); }
 });
 //Sort based on Key i.e, frequency JavaPairDStream<Integer, String>  
sortedCounts = swappedPair.transformToPair( new Function<JavaPairRDD<Integer, 
String>, JavaPairRDD<Integer, String>>() { private static final long 
serialVersionUID = -4172702039963232779L;
 public JavaPairRDD<Integer, String> call(JavaPairRDD<Integer, String> in) 
throws Exception { //False to denote sort in descending order return 
in.sortByKey(false); } });
 //Iterate through the RDD and get the top 20 values in the sorted pair and 
write to results list sortedCounts.foreach( new Function<JavaPairRDD<Integer, 
String>, Void> () { private static final long serialVersionUID = 
2186144129973051920L;
 public Void call(JavaPairRDD<Integer, String> rdd) { resultList.clear(); for 
(Tuple2<Integer, String> t: rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) { 
resultList.add(new Tuple3<String,Integer, Double>(t._2(), t._1(), (double) 
(100*t._1())/totalProcessed.value())); } return null; } } );        



On Apr 7, 2015, at 1:14 AM, Tathagata Das <t...@databricks.com> wrote:
So you want to sort based on the total count of the all the records received 
through receiver? In that case, you have to combine all the counts using 
updateStateByKey 
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala)
 But stepping back, if you want to get the final results at the end of the 
receiving all the data (as opposed to continuously), why are you even using 
streaming? You could create a custom RDD that reads from ElasticSearch and then 
use it in a Spark program. I think that's more natural as your application is 
more batch-like than streaming-like as you are using the results in real-time.
TD
On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty <hpoli...@icloud.com> wrote:

I have created a Custom Receiver to fetch records pertaining to a specific 
query from Elastic Search and have implemented Streaming RDD transformations to 
process the data generated by the receiver.

The final RDD is a sorted list of name value pairs and I want to read the top 
20 results programmatically rather than write to an external file.
I use "foreach" on the RDD and take the top 20 values into a list. I see that 
forEach is processed every time there is a new microbatch from the receiver.

However, I want the foreach computation to be done only once when the receiver 
has finished fetching all the records from Elastic Search and before the 
streaming context is killed so that I can populate the results into a list and 
process it in my driver program.

Appreciate any guidance in this regard.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







  

Reply via email to