Hi, >From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));JavaReceiverInputDStream<String> lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple= hash.mapToPair(<my-code>)JavaPairDStream output= tuple.reduceByKey(<my-code>) output.foreachRDD( new Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ @Override public Void call( JavaPairRDD<String, ArrayList<String>> arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){ @Override public void call( Iterator<Tuple2<String, ArrayList<String>>> arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") .setConfig("remote_shell_enabled", "true") .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 < String, ArrayList < String >> tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println("Alread in Database:" + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayList<String> zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println("Already in Database:" + zipcode); if(oldHMac==true && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode); } else{ Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); Neo4jOperations.travelTo(HMac, Zipcode); } } } tx.success(); } graphDb.shutdown(); } }); return null; } }); The part of code in output.foreachRDD pushes the output of spark into Neo4j Database. Checking for duplicates values. This part of code is very time consuming because of which my processing time exceeds batch time. Because of that, it *result in dataloss*. So, I was thinking of pushing the output into the database asynchronously. I found AsyncRDDActions( https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html) for this purpose, but cannot find a working example for that in Java. Especially, the function foreachPatitionAsync inside which we have to use "Function1" Any help is appreciated. Thanks, Gautam