This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of
AsyncRDDActions in Java ?

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj <gautam1...@gmail.com> wrote:

> I am received data at UDP port 8060 and doing processing on it using Spark
> and storing the output in Neo4j.
>
> But the data I'm receiving and the data that is getting stored doesn't
> match probably because Neo4j API takes too long to push the data into
> database. Meanwhile, Spark is unable to receive data probably because the
> process is blocked.
>
> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Can you elaborate on how the data loss is occurring?
>>
>>
>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com>
>> wrote:
>>
>>> That is completely alright, as the system will make sure the works get
>>> done.
>>>
>>> My major concern is, the data drop. Will using async stop data loss?
>>>
>>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> If you cannot push data as fast as you are generating it, then async
>>>> isnt going to help either. The "work" is just going to keep piling up as
>>>> many many async jobs even though your batch processing times will be low as
>>>> that processing time is not going to reflect how much of overall work is
>>>> pending in the system.
>>>>
>>>> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> From my understanding of Spark Streaming, I created a spark entry
>>>>> point, for continuous UDP data, using:
>>>>>
>>>>> SparkConf conf = new 
>>>>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>>>>  jssc = new JavaStreamingContext(conf, new 
>>>>> Duration(10000));JavaReceiverInputDStream<String> lines = 
>>>>> jssc.receiverStream(new CustomReceiver(8060));
>>>>>
>>>>> Now, when I process this input stream using:
>>>>>
>>>>> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= 
>>>>> hash.mapToPair(<my-code>)JavaPairDStream output= 
>>>>> tuple.reduceByKey(<my-code>)
>>>>> output.foreachRDD(
>>>>>                 new 
>>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
>>>>>                     @Override
>>>>>                     public Void call(
>>>>>                             JavaPairRDD<String, ArrayList<String>> arg0,
>>>>>                             Time arg1) throws Exception {
>>>>>                         // TODO Auto-generated method stub
>>>>>                         new AsyncRDDActions(arg0.rdd(), null);
>>>>>                         arg0.foreachPartition(
>>>>>                                 new 
>>>>> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){
>>>>>
>>>>>                                     @Override
>>>>>                                     public void call(
>>>>>                                             Iterator<Tuple2<String, 
>>>>> ArrayList<String>>> arg0)
>>>>>                                             throws Exception {
>>>>>
>>>>>                                         // TODO Auto-generated method stub
>>>>>                                         GraphDatabaseService graphDb = 
>>>>> new 
>>>>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>>>>>                                                 
>>>>> .setConfig("remote_shell_enabled", "true")
>>>>>                                                 .newGraphDatabase();
>>>>>
>>>>>                                         try (Transaction tx = 
>>>>> graphDb.beginTx()) {
>>>>>                                             while (arg0.hasNext()) {
>>>>>                                                 Tuple2 < String, 
>>>>> ArrayList < String >> tuple = arg0.next();
>>>>>                                                 Node 
>>>>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>>>>>                                                 boolean oldHMac=false;
>>>>>                                                 if (HMac!= null){
>>>>>                                                     
>>>>> System.out.println("Alread in Database:" + tuple._1);
>>>>>                                                     oldHMac=true;
>>>>>                                                 }
>>>>>                                                 else
>>>>>                                                     
>>>>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>>>>>
>>>>>                                                 ArrayList<String> 
>>>>> zipcodes=tuple._2;
>>>>>                                                 for(String zipcode : 
>>>>> zipcodes){
>>>>>                                                     Node 
>>>>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>>>>>                                                     if(Zipcode!=null){
>>>>>                                                         
>>>>> System.out.println("Already in Database:" + zipcode);
>>>>>                                                         if(oldHMac==true 
>>>>> && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>>>>>                                                             
>>>>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>>>>>                                                         else
>>>>>                                                             
>>>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>>>                                                     }
>>>>>                                                     else{
>>>>>                                                         
>>>>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>>>>>                                                         
>>>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>>>                                                     }
>>>>>                                                 }
>>>>>                                             }
>>>>>                                             tx.success();
>>>>>                                         }
>>>>>                                         graphDb.shutdown();
>>>>>                                     }
>>>>>                         });
>>>>>                         return null;
>>>>>                     }
>>>>>                 });
>>>>>
>>>>> The part of code in output.foreachRDD pushes the output of spark into
>>>>> Neo4j Database. Checking for duplicates values.
>>>>>
>>>>> This part of code is very time consuming because of which my
>>>>> processing time exceeds batch time. Because of that, it *result in
>>>>> dataloss*. So, I was thinking of pushing the output into the database
>>>>> asynchronously.
>>>>> I found AsyncRDDActions(
>>>>> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
>>>>> for this purpose, but cannot find a working example for that in Java.
>>>>> Especially, the function foreachPatitionAsync inside which we have to use
>>>>> "Function1"
>>>>>
>>>>> Any help is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Gautam
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>



-- 
Gautam

Reply via email to