How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
I have created a Custom Receiver to fetch records pertaining to a specific 
query from Elastic Search and have implemented Streaming RDD transformations to 
process the data generated by the receiver. 

The final RDD is a sorted list of name value pairs and I want to read the top 
20 results programmatically rather than write to an external file.
I use foreach on the RDD and take the top 20 values into a list. I see that 
forEach is processed every time there is a new microbatch from the receiver.

However, I want the foreach computation to be done only once when the receiver 
has finished fetching all the records from Elastic Search and before the 
streaming context is killed so that I can populate the results into a list and 
process it in my driver program. 

Appreciate any guidance in this regard.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
Thanks. I’ll look into it. But the JSON string I push via receiver goes through 
a series of transformations, before it ends up in the final RDD. I need to take 
care to ensure that this magic value propagates all the way down to the last 
one that I’m iterating on.

Currently, I’m calling “stop from the receiver once its done fetching all the 
records and have a StreamingListener to act on  it via the “onReceiverStopped” 
hook through which I’m stopping the streamingContext and it seems to be working 
except that I see this message 2015-04-06 16:41:48,002 WARN 
[StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the 
receivers have not deregistered, Map(0 - 
ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,XYZ,)):

Is this not advised? BTW I’m running in local mode.
 

 On Apr 7, 2015, at 1:43 AM, Michael Malak michaelma...@yahoo.com 
 mailto:michaelma...@yahoo.com wrote:
 
 You could have your receiver send a magic value when it is done. I discuss 
 this Spark Streaming pattern in my presentation Spark Gotchas and 
 Anti-Patterns. In the PDF version, it's slides 34-36.
 http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language
  
 http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language
 
 YouTube version cued to that place: 
 http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s 
 http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s 
  
 
 From: Hari Polisetty hpoli...@icloud.com mailto:hpoli...@icloud.com
 To: Tathagata Das t...@databricks.com mailto:t...@databricks.com 
 Cc: user user@spark.apache.org mailto:user@spark.apache.org 
 Sent: Monday, April 6, 2015 2:02 PM
 Subject: Re: How to restrict foreach on a streaming RDD only once upon 
 receiver completion
 
 Yes, I’m using updateStateByKey and it works. But then I need to perform 
 further computation on this Stateful RDD (see code snippet below). I perform 
 forEach on the final RDD and get the top 10 records. I just don’t want the 
 foreach to be performed every time a new batch is received. Only when the 
 receiver is done fetching all the records.
 
 My requirements are to programmatically invoke the E.S query (it varies by 
 usecase) , get all the records and apply certain transformations and get the 
 top 10 results based on certain criteria back into the driver program for 
 further processing. I’m able to apply the transformations on the batches of 
 records fetched from E.S  using streaming. So, I don’t need to wait for all 
 the records to be fetched. The RDD transformations are happening all the time 
 and the top k results are getting updated constantly until all the records 
 are fetched by the receiver. Is there any drawback with this approach?
 
 Can you give more pointers on what you mean by creating a custom RDD that 
 reads from ElasticSearch? 
 
 Here is the relevant portion of my Spark streaming code:
 
   //Create a custom streaming receiver to query for relevant data 
 from E.S
   JavaReceiverInputDStreamString jsonStrings = 
 ssc.receiverStream(
   new ElasticSearchResponseReceiver(query…….));
 
   //Apply JSON Paths to extract specific value(s) from each record
   JavaDStreamString fieldVariations = jsonStrings.flatMap(new 
 FlatMapFunctionString, String() {
   private static final long serialVersionUID = 
 465237345751948L;
 
   @Override
   public IterableString call(String jsonString) {
   ListString r = JsonPath.read(jsonString,
   attributeDetail.getJsonPath());
   return r;
   }
 
   });
 
   //Perform a stateful map reduce on each variation
   JavaPairDStreamString, Integer fieldVariationCounts = 
 fieldVariations.mapToPair(
   new PairFunctionString, String, Integer() {
   private static final long 
 serialVersionUID = -1241276515559408238L;
 
   @Override public Tuple2String, 
 Integer call(String s) {
   return new Tuple2String, 
 Integer(s, 1);
   }
   }).updateStateByKey(new Function2ListInteger,
   OptionalInteger, 
 OptionalInteger () {
   private static final long 
 serialVersionUID = 7598681835161199865L;
 
   public OptionalInteger 
 call(ListInteger nums, OptionalInteger current) {
   Integer sum =  current.or((int) 
 0L);
   return (OptionalInteger) 
 Optional.of(sum + nums.size

Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Hari Polisetty
);
}
});

//Iterate through the RDD and get the top 20 values in the 
sorted pair and write to results list
sortedCounts.foreach(
new FunctionJavaPairRDDInteger, String, 
Void () {
private static final long 
serialVersionUID = 2186144129973051920L;

public Void call(JavaPairRDDInteger, 
String rdd) {
resultList.clear();
for (Tuple2Integer, String t: 
rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) {
resultList.add(new 
Tuple3String,Integer, Double(t._2(), t._1(), (double) 
(100*t._1())/totalProcessed.value()));
}
return null;
}
}
);  

 On Apr 7, 2015, at 1:14 AM, Tathagata Das t...@databricks.com wrote:
 
 So you want to sort based on the total count of the all the records received 
 through receiver? In that case, you have to combine all the counts using 
 updateStateByKey 
 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
  
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala)
  
 But stepping back, if you want to get the final results at the end of the 
 receiving all the data (as opposed to continuously), why are you even using 
 streaming? You could create a custom RDD that reads from ElasticSearch and 
 then use it in a Spark program. I think that's more natural as your 
 application is more batch-like than streaming-like as you are using the 
 results in real-time.
 
 TD
 
 On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty hpoli...@icloud.com 
 mailto:hpoli...@icloud.com wrote:
 I have created a Custom Receiver to fetch records pertaining to a specific 
 query from Elastic Search and have implemented Streaming RDD transformations 
 to process the data generated by the receiver.
 
 The final RDD is a sorted list of name value pairs and I want to read the top 
 20 results programmatically rather than write to an external file.
 I use foreach on the RDD and take the top 20 values into a list. I see that 
 forEach is processed every time there is a new microbatch from the receiver.
 
 However, I want the foreach computation to be done only once when the 
 receiver has finished fetching all the records from Elastic Search and before 
 the streaming context is killed so that I can populate the results into a 
 list and process it in my driver program.
 
 Appreciate any guidance in this regard.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Seeing message about receiver not being de-registered on invoking Streaming context stop

2015-04-06 Thread Hari Polisetty

 My application is running Spark in local mode and  I have a Spark Streaming 
Listener as well as a Custom Receiver. When the receiver is done fetching all 
documents, it invokes “stop” on itself.
I see the StreamingListener  getting a callback on “onReceiverStopped” where I 
stop the streaming context.

However, I see the following message in my logs:

2015-04-06 16:41:51,193 WARN [Thread-66] 
com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop 
- Stopped receiver
2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] 
org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: 
AlHURLEY
2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] 
org.apache.spark.Logging$class.logWarning - Stopped executor without error
2015-04-06 16:41:51,203 WARN [StreamingListenerBus] 
org.apache.spark.Logging$class.logWarning - All of the receivers have not 
deregistered, Map(0 - 
ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY))

What am I missing or doing wrong?