RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
… and measure 4 is to implement a custom Feedback Loop to e.g.to  monitor the 
amount of free RAM and number of queued jobs and automatically decrease the 
message consumption  rate of the Receiver until the number of clogged RDDs and 
Jobs subsides (again here you artificially decrease your performance in the 
name of the reliability/integrity of your system ie not loosing messages)

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 9:39 PM
To: 'Tathagata Das'; 'Gautam Bajaj'
Cc: 'user'
Subject: RE: Storing spark processed output to Database asynchronously.

 

If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das  wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj  wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das  wrote:

If you cannot push data as fast as you are gener

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das  wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj  wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das  wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help either. The "work" is just going to keep piling up as many many async 
jobs even though your batch processing times will be low as that processing 
time is not going to reflect how much of overall work is pending in the system.

 

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj  wrote:

Hi,

 

>From my understanding of Spark Streaming, I created a spark entry point, for 
>continuous UDP data, using:

 

SparkConf conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, 

Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Tathagata Das
Something does not make sense. Receivers (currently) does not get blocked
(unless rate limit has been set) due to processing load. The receiver will
continue to receive data and store it in memory and until it is processed.
So I am still not sure how the data loss is happening. Unless you are
sending data at a faster rate than the receiver can handle (that more than
the max rate the receiver can save data in memory and replicate to other
nodes).

In general, if you are particular about data loss, then UDP is not really a
good choice in the first place. If you can try using TCP, try it. It would
at least eliminate the possibility that I mentioned above. Ultimately if
you try sending data faster that the receiver can handle (independent of
whether processing can handle), then you will loose data if you are using
UDP. You have to use TCP to naturally control the sending rate to match the
receiving rate in the receiver, without dropping data.


On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

> This is just a friendly ping, just to remind you of my query.
>
> Also, is there a possible explanation/example on the usage of
> AsyncRDDActions in Java ?
>
> On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj 
> wrote:
>
>> I am received data at UDP port 8060 and doing processing on it using
>> Spark and storing the output in Neo4j.
>>
>> But the data I'm receiving and the data that is getting stored doesn't
>> match probably because Neo4j API takes too long to push the data into
>> database. Meanwhile, Spark is unable to receive data probably because the
>> process is blocked.
>>
>> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das 
>> wrote:
>>
>>> Can you elaborate on how the data loss is occurring?
>>>
>>>
>>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj 
>>> wrote:
>>>
 That is completely alright, as the system will make sure the works get
 done.

 My major concern is, the data drop. Will using async stop data loss?

 On Thu, May 21, 2015 at 4:55 PM, Tathagata Das 
 wrote:

> If you cannot push data as fast as you are generating it, then async
> isnt going to help either. The "work" is just going to keep piling up as
> many many async jobs even though your batch processing times will be low 
> as
> that processing time is not going to reflect how much of overall work is
> pending in the system.
>
> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
> wrote:
>
>> Hi,
>>
>> From my understanding of Spark Streaming, I created a spark entry
>> point, for continuous UDP data, using:
>>
>> SparkConf conf = new 
>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>  jssc = new JavaStreamingContext(conf, new 
>> Duration(1));JavaReceiverInputDStream lines = 
>> jssc.receiverStream(new CustomReceiver(8060));
>>
>> Now, when I process this input stream using:
>>
>> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
>> hash.mapToPair()JavaPairDStream output= 
>> tuple.reduceByKey()
>> output.foreachRDD(
>> new 
>> Function2>,Time,Void>(){
>> @Override
>> public Void call(
>> JavaPairRDD> arg0,
>> Time arg1) throws Exception {
>> // TODO Auto-generated method stub
>> new AsyncRDDActions(arg0.rdd(), null);
>> arg0.foreachPartition(
>> new 
>> VoidFunction>>>(){
>>
>> @Override
>> public void call(
>> Iterator> ArrayList>> arg0)
>> throws Exception {
>>
>> // TODO Auto-generated method 
>> stub
>> GraphDatabaseService graphDb = 
>> new 
>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>> 
>> .setConfig("remote_shell_enabled", "true")
>> .newGraphDatabase();
>>
>> try (Transaction tx = 
>> graphDb.beginTx()) {
>> while (arg0.hasNext()) {
>> Tuple2 < String, 
>> ArrayList < String >> tuple = arg0.next();
>> Node 
>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>> boolean oldHMac=false;
>> if (HMac!= null){
>>

Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Gautam Bajaj
This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of
AsyncRDDActions in Java ?

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

> I am received data at UDP port 8060 and doing processing on it using Spark
> and storing the output in Neo4j.
>
> But the data I'm receiving and the data that is getting stored doesn't
> match probably because Neo4j API takes too long to push the data into
> database. Meanwhile, Spark is unable to receive data probably because the
> process is blocked.
>
> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das 
> wrote:
>
>> Can you elaborate on how the data loss is occurring?
>>
>>
>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj 
>> wrote:
>>
>>> That is completely alright, as the system will make sure the works get
>>> done.
>>>
>>> My major concern is, the data drop. Will using async stop data loss?
>>>
>>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das 
>>> wrote:
>>>
 If you cannot push data as fast as you are generating it, then async
 isnt going to help either. The "work" is just going to keep piling up as
 many many async jobs even though your batch processing times will be low as
 that processing time is not going to reflect how much of overall work is
 pending in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
 wrote:

> Hi,
>
> From my understanding of Spark Streaming, I created a spark entry
> point, for continuous UDP data, using:
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>  jssc = new JavaStreamingContext(conf, new 
> Duration(1));JavaReceiverInputDStream lines = 
> jssc.receiverStream(new CustomReceiver(8060));
>
> Now, when I process this input stream using:
>
> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
> hash.mapToPair()JavaPairDStream output= 
> tuple.reduceByKey()
> output.foreachRDD(
> new 
> Function2>,Time,Void>(){
> @Override
> public Void call(
> JavaPairRDD> arg0,
> Time arg1) throws Exception {
> // TODO Auto-generated method stub
> new AsyncRDDActions(arg0.rdd(), null);
> arg0.foreachPartition(
> new 
> VoidFunction>>>(){
>
> @Override
> public void call(
> Iterator ArrayList>> arg0)
> throws Exception {
>
> // TODO Auto-generated method stub
> GraphDatabaseService graphDb = 
> new 
> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
> 
> .setConfig("remote_shell_enabled", "true")
> .newGraphDatabase();
>
> try (Transaction tx = 
> graphDb.beginTx()) {
> while (arg0.hasNext()) {
> Tuple2 < String, 
> ArrayList < String >> tuple = arg0.next();
> Node 
> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
> boolean oldHMac=false;
> if (HMac!= null){
> 
> System.out.println("Alread in Database:" + tuple._1);
> oldHMac=true;
> }
> else
> 
> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>
> ArrayList 
> zipcodes=tuple._2;
> for(String zipcode : 
> zipcodes){
> Node 
> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
> if(Zipcode!=null){
> 
> System.out.println("Already in Database:" + zipcode);
> if(oldHMac==true 
> && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
> 

Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Tathagata Das
Can you elaborate on how the data loss is occurring?


On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj  wrote:

> That is completely alright, as the system will make sure the works get
> done.
>
> My major concern is, the data drop. Will using async stop data loss?
>
> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das 
> wrote:
>
>> If you cannot push data as fast as you are generating it, then async isnt
>> going to help either. The "work" is just going to keep piling up as many
>> many async jobs even though your batch processing times will be low as that
>> processing time is not going to reflect how much of overall work is pending
>> in the system.
>>
>> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
>> wrote:
>>
>>> Hi,
>>>
>>> From my understanding of Spark Streaming, I created a spark entry point,
>>> for continuous UDP data, using:
>>>
>>> SparkConf conf = new 
>>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>>  jssc = new JavaStreamingContext(conf, new 
>>> Duration(1));JavaReceiverInputDStream lines = 
>>> jssc.receiverStream(new CustomReceiver(8060));
>>>
>>> Now, when I process this input stream using:
>>>
>>> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
>>> hash.mapToPair()JavaPairDStream output= 
>>> tuple.reduceByKey()
>>> output.foreachRDD(
>>> new 
>>> Function2>,Time,Void>(){
>>> @Override
>>> public Void call(
>>> JavaPairRDD> arg0,
>>> Time arg1) throws Exception {
>>> // TODO Auto-generated method stub
>>> new AsyncRDDActions(arg0.rdd(), null);
>>> arg0.foreachPartition(
>>> new 
>>> VoidFunction>>>(){
>>>
>>> @Override
>>> public void call(
>>> Iterator>> ArrayList>> arg0)
>>> throws Exception {
>>>
>>> // TODO Auto-generated method stub
>>> GraphDatabaseService graphDb = new 
>>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>>> 
>>> .setConfig("remote_shell_enabled", "true")
>>> .newGraphDatabase();
>>>
>>> try (Transaction tx = 
>>> graphDb.beginTx()) {
>>> while (arg0.hasNext()) {
>>> Tuple2 < String, ArrayList 
>>> < String >> tuple = arg0.next();
>>> Node 
>>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>>> boolean oldHMac=false;
>>> if (HMac!= null){
>>> 
>>> System.out.println("Alread in Database:" + tuple._1);
>>> oldHMac=true;
>>> }
>>> else
>>> 
>>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>>>
>>> ArrayList 
>>> zipcodes=tuple._2;
>>> for(String zipcode : 
>>> zipcodes){
>>> Node 
>>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>>> if(Zipcode!=null){
>>> 
>>> System.out.println("Already in Database:" + zipcode);
>>> if(oldHMac==true && 
>>> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>>> 
>>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>>> else
>>> 
>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>> }
>>> else{
>>> 
>>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>>> 
>>> Neo4jOperations.travelTo(HMac, Zipcode);
>>> }
>>> }
>>> }
>>> tx.success();
>>>   

Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Gautam Bajaj
That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss?

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das  wrote:

> If you cannot push data as fast as you are generating it, then async isnt
> going to help either. The "work" is just going to keep piling up as many
> many async jobs even though your batch processing times will be low as that
> processing time is not going to reflect how much of overall work is pending
> in the system.
>
> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
> wrote:
>
>> Hi,
>>
>> From my understanding of Spark Streaming, I created a spark entry point,
>> for continuous UDP data, using:
>>
>> SparkConf conf = new 
>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>  jssc = new JavaStreamingContext(conf, new 
>> Duration(1));JavaReceiverInputDStream lines = 
>> jssc.receiverStream(new CustomReceiver(8060));
>>
>> Now, when I process this input stream using:
>>
>> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
>> hash.mapToPair()JavaPairDStream output= tuple.reduceByKey()
>> output.foreachRDD(
>> new 
>> Function2>,Time,Void>(){
>> @Override
>> public Void call(
>> JavaPairRDD> arg0,
>> Time arg1) throws Exception {
>> // TODO Auto-generated method stub
>> new AsyncRDDActions(arg0.rdd(), null);
>> arg0.foreachPartition(
>> new 
>> VoidFunction>>>(){
>>
>> @Override
>> public void call(
>> Iterator> ArrayList>> arg0)
>> throws Exception {
>>
>> // TODO Auto-generated method stub
>> GraphDatabaseService graphDb = new 
>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>> 
>> .setConfig("remote_shell_enabled", "true")
>> .newGraphDatabase();
>>
>> try (Transaction tx = 
>> graphDb.beginTx()) {
>> while (arg0.hasNext()) {
>> Tuple2 < String, ArrayList < 
>> String >> tuple = arg0.next();
>> Node 
>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>> boolean oldHMac=false;
>> if (HMac!= null){
>> 
>> System.out.println("Alread in Database:" + tuple._1);
>> oldHMac=true;
>> }
>> else
>> 
>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>>
>> ArrayList 
>> zipcodes=tuple._2;
>> for(String zipcode : 
>> zipcodes){
>> Node 
>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
>> if(Zipcode!=null){
>> 
>> System.out.println("Already in Database:" + zipcode);
>> if(oldHMac==true && 
>> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
>> 
>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
>> else
>> 
>> Neo4jOperations.travelTo(HMac, Zipcode);
>> }
>> else{
>> 
>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
>> 
>> Neo4jOperations.travelTo(HMac, Zipcode);
>> }
>> }
>> }
>> tx.success();
>> }
>> graphDb.shutdown();
>> }
>> });
>> return null;
>> }
>>

Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Tathagata Das
If you cannot push data as fast as you are generating it, then async isnt
going to help either. The "work" is just going to keep piling up as many
many async jobs even though your batch processing times will be low as that
processing time is not going to reflect how much of overall work is pending
in the system.

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj  wrote:

> Hi,
>
> From my understanding of Spark Streaming, I created a spark entry point,
> for continuous UDP data, using:
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>  jssc = new JavaStreamingContext(conf, new 
> Duration(1));JavaReceiverInputDStream lines = 
> jssc.receiverStream(new CustomReceiver(8060));
>
> Now, when I process this input stream using:
>
> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
> hash.mapToPair()JavaPairDStream output= tuple.reduceByKey()
> output.foreachRDD(
> new 
> Function2>,Time,Void>(){
> @Override
> public Void call(
> JavaPairRDD> arg0,
> Time arg1) throws Exception {
> // TODO Auto-generated method stub
> new AsyncRDDActions(arg0.rdd(), null);
> arg0.foreachPartition(
> new 
> VoidFunction>>>(){
>
> @Override
> public void call(
> Iterator ArrayList>> arg0)
> throws Exception {
>
> // TODO Auto-generated method stub
> GraphDatabaseService graphDb = new 
> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
> 
> .setConfig("remote_shell_enabled", "true")
> .newGraphDatabase();
>
> try (Transaction tx = 
> graphDb.beginTx()) {
> while (arg0.hasNext()) {
> Tuple2 < String, ArrayList < 
> String >> tuple = arg0.next();
> Node 
> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
> boolean oldHMac=false;
> if (HMac!= null){
> 
> System.out.println("Alread in Database:" + tuple._1);
> oldHMac=true;
> }
> else
> 
> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>
> ArrayList 
> zipcodes=tuple._2;
> for(String zipcode : 
> zipcodes){
> Node 
> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
> if(Zipcode!=null){
> 
> System.out.println("Already in Database:" + zipcode);
> if(oldHMac==true && 
> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
> 
> Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
> else
> 
> Neo4jOperations.travelTo(HMac, Zipcode);
> }
> else{
> 
> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
> 
> Neo4jOperations.travelTo(HMac, Zipcode);
> }
> }
> }
> tx.success();
> }
> graphDb.shutdown();
> }
> });
> return null;
> }
> });
>
> The part of code in output.foreachRDD pushes the output of spark into
> Neo4j Database. Checking for duplicates values.
>
> This part of code is very time consuming because of which my processing
> time exceeds batch time. Because of that, it *result in dataloss*. So, I
> was thinking