Hi,

>From my understanding of Spark Streaming, I created a spark entry point,
for continuous UDP data, using:

SparkConf conf = new
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
jssc = new JavaStreamingContext(conf, new
Duration(10000));JavaReceiverInputDStream<String> lines =
jssc.receiverStream(new CustomReceiver(8060));

Now, when I process this input stream using:

JavaDStream hash=lines.flatMap(<my-code>)JavaPairDStream tuple=
hash.mapToPair(<my-code>)JavaPairDStream output=
tuple.reduceByKey(<my-code>)
output.foreachRDD(
                new
Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList<String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub
                        new AsyncRDDActions(arg0.rdd(), null);
                        arg0.foreachPartition(
                                new
VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){

                                    @Override
                                    public void call(
                                            Iterator<Tuple2<String,
ArrayList<String>>> arg0)
                                            throws Exception {

                                        // TODO Auto-generated method stub
                                        GraphDatabaseService graphDb =
new 
GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")

.setConfig("remote_shell_enabled", "true")
                                                .newGraphDatabase();

                                        try (Transaction tx =
graphDb.beginTx()) {
                                            while (arg0.hasNext()) {
                                                Tuple2 < String,
ArrayList < String >> tuple = arg0.next();
                                                Node
HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
                                                boolean oldHMac=false;
                                                if (HMac!= null){

System.out.println("Alread in Database:" + tuple._1);
                                                    oldHMac=true;
                                                }
                                                else

HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

                                                ArrayList<String>
zipcodes=tuple._2;
                                                for(String zipcode : zipcodes){
                                                    Node
Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
                                                    if(Zipcode!=null){

System.out.println("Already in Database:" + zipcode);

if(oldHMac==true && Neo4jOperations.getRelationshipBetween(HMac,
Zipcode)!=null)

Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
                                                        else

Neo4jOperations.travelTo(HMac, Zipcode);
                                                    }
                                                    else{

Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);

Neo4jOperations.travelTo(HMac, Zipcode);
                                                    }
                                                }
                                            }
                                            tx.success();
                                        }
                                        graphDb.shutdown();
                                    }
                        });
                        return null;
                    }
                });

The part of code in output.foreachRDD pushes the output of spark into Neo4j
Database. Checking for duplicates values.

This part of code is very time consuming because of which my processing
time exceeds batch time. Because of that, it *result in dataloss*. So, I
was thinking of pushing the output into the database asynchronously.
I found AsyncRDDActions(
https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
for this purpose, but cannot find a working example for that in Java.
Especially, the function foreachPatitionAsync inside which we have to use
"Function1"

Any help is appreciated.

Thanks,
Gautam

Reply via email to