Can you elaborate on how the data loss is occurring?

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com> wrote:

> That is completely alright, as the system will make sure the works get
> done.
>
> My major concern is, the data drop. Will using async stop data loss?
>
> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> If you cannot push data as fast as you are generating it, then async isnt
>> going to help either. The "work" is just going to keep piling up as many
>> many async jobs even though your batch processing times will be low as that
>> processing time is not going to reflect how much of overall work is pending
>> in the system.
>>
>> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> From my understanding of Spark Streaming, I created a spark entry point,
>>> for continuous UDP data, using:
>>>
>>> SparkConf conf = new 
>>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>>  jssc = new JavaStreamingContext(conf, new 
>>> Duration(10000));JavaReceiverInputDStream<String> lines = 
>>> jssc.receiverStream(new CustomReceiver(8060));
>>>
>>> Now, when I process this input stream using:
>>>
>>> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= 
>>> hash.mapToPair(<my-code>)JavaPairDStream output= 
>>> tuple.reduceByKey(<my-code>)
>>> output.foreachRDD(
>>>                 new 
>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
>>>                     @Override
>>>                     public Void call(
>>>                             JavaPairRDD<String, ArrayList<String>> arg0,
>>>                             Time arg1) throws Exception {
>>>                         // TODO Auto-generated method stub
>>>                         new AsyncRDDActions(arg0.rdd(), null);
>>>                         arg0.foreachPartition(
>>>                                 new 
>>> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){
>>>
>>>                                     @Override
>>>                                     public void call(
>>>                                             Iterator<Tuple2<String, 
>>> ArrayList<String>>> arg0)
>>>                                             throws Exception {
>>>
>>>                                         // TODO Auto-generated method stub
>>>                                         GraphDatabaseService graphDb = new 
>>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>>>                                                 
>>> .setConfig("remote_shell_enabled", "true")
>>>                                                 .newGraphDatabase();
>>>
>>>                                         try (Transaction tx = 
>>> graphDb.beginTx()) {
>>>                                             while (arg0.hasNext()) {
>>>                                                 Tuple2 < String, ArrayList 
>>> < String >> tuple = arg0.next();
>>>                                                 Node 
>>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>>>                                                 boolean oldHMac=false;
>>>                                                 if (HMac!= null){
>>>                                                     
>>> System.out.println("Alread in Database:" + tuple._1);
>>>                                                     oldHMac=true;
>>>                                                 }
>>>                                                 else
>>>                                                     
>>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>>>
>>>                                                 ArrayList<String> 
>>> zipcodes=tuple._2;
>>>                                                 for(String zipcode : 
>>> zipcodes){
>>>                                                     Node 
>>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>>>                                                     if(Zipcode!=null){
>>>                                                         
>>> System.out.println("Already in Database:" + zipcode);
>>>                                                         if(oldHMac==true && 
>>> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>>>                                                             
>>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>>>                                                         else
>>>                                                             
>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>                                                     }
>>>                                                     else{
>>>                                                         
>>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>>>                                                         
>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>>                                                     }
>>>                                                 }
>>>                                             }
>>>                                             tx.success();
>>>                                         }
>>>                                         graphDb.shutdown();
>>>                                     }
>>>                         });
>>>                         return null;
>>>                     }
>>>                 });
>>>
>>> The part of code in output.foreachRDD pushes the output of spark into
>>> Neo4j Database. Checking for duplicates values.
>>>
>>> This part of code is very time consuming because of which my processing
>>> time exceeds batch time. Because of that, it *result in dataloss*. So,
>>> I was thinking of pushing the output into the database asynchronously.
>>> I found AsyncRDDActions(
>>> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
>>> for this purpose, but cannot find a working example for that in Java.
>>> Especially, the function foreachPatitionAsync inside which we have to use
>>> "Function1"
>>>
>>> Any help is appreciated.
>>>
>>> Thanks,
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>

Reply via email to