Can you elaborate on how the data loss is occurring?
On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com> wrote: > That is completely alright, as the system will make sure the works get > done. > > My major concern is, the data drop. Will using async stop data loss? > > On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> > wrote: > >> If you cannot push data as fast as you are generating it, then async isnt >> going to help either. The "work" is just going to keep piling up as many >> many async jobs even though your batch processing times will be low as that >> processing time is not going to reflect how much of overall work is pending >> in the system. >> >> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com> >> wrote: >> >>> Hi, >>> >>> From my understanding of Spark Streaming, I created a spark entry point, >>> for continuous UDP data, using: >>> >>> SparkConf conf = new >>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext >>> jssc = new JavaStreamingContext(conf, new >>> Duration(10000));JavaReceiverInputDStream<String> lines = >>> jssc.receiverStream(new CustomReceiver(8060)); >>> >>> Now, when I process this input stream using: >>> >>> JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= >>> hash.mapToPair(<my-code>)JavaPairDStream output= >>> tuple.reduceByKey(<my-code>) >>> output.foreachRDD( >>> new >>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >>> @Override >>> public Void call( >>> JavaPairRDD<String, ArrayList<String>> arg0, >>> Time arg1) throws Exception { >>> // TODO Auto-generated method stub >>> new AsyncRDDActions(arg0.rdd(), null); >>> arg0.foreachPartition( >>> new >>> VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){ >>> >>> @Override >>> public void call( >>> Iterator<Tuple2<String, >>> ArrayList<String>>> arg0) >>> throws Exception { >>> >>> // TODO Auto-generated method stub >>> GraphDatabaseService graphDb = new >>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") >>> >>> .setConfig("remote_shell_enabled", "true") >>> .newGraphDatabase(); >>> >>> try (Transaction tx = >>> graphDb.beginTx()) { >>> while (arg0.hasNext()) { >>> Tuple2 < String, ArrayList >>> < String >> tuple = arg0.next(); >>> Node >>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); >>> boolean oldHMac=false; >>> if (HMac!= null){ >>> >>> System.out.println("Alread in Database:" + tuple._1); >>> oldHMac=true; >>> } >>> else >>> >>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1); >>> >>> ArrayList<String> >>> zipcodes=tuple._2; >>> for(String zipcode : >>> zipcodes){ >>> Node >>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); >>> if(Zipcode!=null){ >>> >>> System.out.println("Already in Database:" + zipcode); >>> if(oldHMac==true && >>> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) >>> >>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode); >>> else >>> >>> Neo4jOperations.travelTo(HMac, Zipcode); >>> } >>> else{ >>> >>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); >>> >>> Neo4jOperations.travelTo(HMac, Zipcode); >>> } >>> } >>> } >>> tx.success(); >>> } >>> graphDb.shutdown(); >>> } >>> }); >>> return null; >>> } >>> }); >>> >>> The part of code in output.foreachRDD pushes the output of spark into >>> Neo4j Database. Checking for duplicates values. >>> >>> This part of code is very time consuming because of which my processing >>> time exceeds batch time. Because of that, it *result in dataloss*. So, >>> I was thinking of pushing the output into the database asynchronously. >>> I found AsyncRDDActions( >>> https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html) >>> for this purpose, but cannot find a working example for that in Java. >>> Especially, the function foreachPatitionAsync inside which we have to use >>> "Function1" >>> >>> Any help is appreciated. >>> >>> Thanks, >>> Gautam >>> >> >> > > > -- > Gautam >