This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ?
On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj <gautam1...@gmail.com> wrote: > I am received data at UDP port 8060 and doing processing on it using Spark > and storing the output in Neo4j. > > But the data I'm receiving and the data that is getting stored doesn't > match probably because Neo4j API takes too long to push the data into > database. Meanwhile, Spark is unable to receive data probably because the > process is blocked. > > On Thu, May 21, 2015 at 5:28 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Can you elaborate on how the data loss is occurring? >> >> >> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com> >> wrote: >> >>> That is completely alright, as the system will make sure the works get >>> done. >>> >>> My major concern is, the data drop. Will using async stop data loss? >>> >>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> If you cannot push data as fast as you are generating it, then async >>>> isnt going to help either. The "work" is just going to keep piling up as >>>> many many async jobs even though your batch processing times will be low as >>>> that processing time is not going to reflect how much of overall work is >>>> pending in the system. >>>> >>>> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> From my understanding of Spark Streaming, I created a spark entry >>>>> point, for continuous UDP data, using: >>>>> >>>>> SparkConf conf = new >>>>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext >>>>> jssc = new JavaStreamingContext(conf, new >>>>> Duration(10000));JavaReceiverInputDStream<String> lines = >>>>> jssc.receiverStream(new CustomReceiver(8060)); >>>>> >>>>> Now, when I process this input stream using: >>>>> >>>>> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= >>>>> hash.mapToPair(<my-code>)JavaPairDStream output= >>>>> tuple.reduceByKey(<my-code>) >>>>> output.foreachRDD( >>>>> new >>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >>>>> @Override >>>>> public Void call( >>>>> JavaPairRDD<String, ArrayList<String>> arg0, >>>>> Time arg1) throws Exception { >>>>> // TODO Auto-generated method stub >>>>> new AsyncRDDActions(arg0.rdd(), null); >>>>> arg0.foreachPartition( >>>>> new >>>>> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){ >>>>> >>>>> @Override >>>>> public void call( >>>>> Iterator<Tuple2<String, >>>>> ArrayList<String>>> arg0) >>>>> throws Exception { >>>>> >>>>> // TODO Auto-generated method stub >>>>> GraphDatabaseService graphDb = >>>>> new >>>>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") >>>>> >>>>> .setConfig("remote_shell_enabled", "true") >>>>> .newGraphDatabase(); >>>>> >>>>> try (Transaction tx = >>>>> graphDb.beginTx()) { >>>>> while (arg0.hasNext()) { >>>>> Tuple2 < String, >>>>> ArrayList < String >> tuple = arg0.next(); >>>>> Node >>>>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); >>>>> boolean oldHMac=false; >>>>> if (HMac!= null){ >>>>> >>>>> System.out.println("Alread in Database:" + tuple._1); >>>>> oldHMac=true; >>>>> } >>>>> else >>>>> >>>>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1); >>>>> >>>>> ArrayList<String> >>>>> zipcodes=tuple._2; >>>>> for(String zipcode : >>>>> zipcodes){ >>>>> Node >>>>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); >>>>> if(Zipcode!=null){ >>>>> >>>>> System.out.println("Already in Database:" + zipcode); >>>>> if(oldHMac==true >>>>> && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) >>>>> >>>>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode); >>>>> else >>>>> >>>>> Neo4jOperations.travelTo(HMac, Zipcode); >>>>> } >>>>> else{ >>>>> >>>>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); >>>>> >>>>> Neo4jOperations.travelTo(HMac, Zipcode); >>>>> } >>>>> } >>>>> } >>>>> tx.success(); >>>>> } >>>>> graphDb.shutdown(); >>>>> } >>>>> }); >>>>> return null; >>>>> } >>>>> }); >>>>> >>>>> The part of code in output.foreachRDD pushes the output of spark into >>>>> Neo4j Database. Checking for duplicates values. >>>>> >>>>> This part of code is very time consuming because of which my >>>>> processing time exceeds batch time. Because of that, it *result in >>>>> dataloss*. So, I was thinking of pushing the output into the database >>>>> asynchronously. >>>>> I found AsyncRDDActions( >>>>> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html) >>>>> for this purpose, but cannot find a working example for that in Java. >>>>> Especially, the function foreachPatitionAsync inside which we have to use >>>>> "Function1" >>>>> >>>>> Any help is appreciated. >>>>> >>>>> Thanks, >>>>> Gautam >>>>> >>>> >>>> >>> >>> >>> -- >>> Gautam >>> >> >> > > > -- > Gautam > -- Gautam