… and measure 4 is to implement a custom Feedback Loop to e.g.to  monitor the 
amount of free RAM and number of queued jobs and automatically decrease the 
message consumption  rate of the Receiver until the number of clogged RDDs and 
Jobs subsides (again here you artificially decrease your performance in the 
name of the reliability/integrity of your system ie not loosing messages)

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 9:39 PM
To: 'Tathagata Das'; 'Gautam Bajaj'
Cc: 'user'
Subject: RE: Storing spark processed output to Database asynchronously.

 

If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.       Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.       However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.       In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until boooom the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.       Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.       Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.       Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj <gautam1...@gmail.com> wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj <gautam1...@gmail.com> wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das <t...@databricks.com> wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj <gautam1...@gmail.com> wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help either. The "work" is just going to keep piling up as many many async 
jobs even though your batch processing times will be low as that processing 
time is not going to reflect how much of overall work is pending in the system.

 

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj <gautam1...@gmail.com> wrote:

Hi,

 

>From my understanding of Spark Streaming, I created a spark entry point, for 
>continuous UDP data, using:

 

SparkConf conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));
JavaReceiverInputDStream<String> lines = jssc.receiverStream(new 
CustomReceiver(8060));

Now, when I process this input stream using:

JavaDStream hash=lines.flatMap(<my-code>)
JavaPairDStream tuple= hash.mapToPair(<my-code>)
JavaPairDStream output= tuple.reduceByKey(<my-code>)
output.foreachRDD(
                new 
Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){
                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList<String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub
                        new AsyncRDDActions(arg0.rdd(), null);
                        arg0.foreachPartition(
                                new 
VoidFunction<Iterator<Tuple2<String,ArrayList<String>>>>(){
 
                                    @Override
                                    public void call(
                                            Iterator<Tuple2<String, 
ArrayList<String>>> arg0)
                                            throws Exception {
 
                                        // TODO Auto-generated method stub
                                        GraphDatabaseService graphDb = new 
GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
                                                
.setConfig("remote_shell_enabled", "true")
                                                .newGraphDatabase();    
 
                                        try (Transaction tx = 
graphDb.beginTx()) {
                                            while (arg0.hasNext()) {
                                                Tuple2 < String, ArrayList < 
String >> tuple = arg0.next();
                                                Node 
HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
                                                boolean oldHMac=false;
                                                if (HMac!= null){
                                                    System.out.println("Alread 
in Database:" + tuple._1);
                                                    oldHMac=true;
                                                }
                                                else
                                                    
HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
 
                                                ArrayList<String> 
zipcodes=tuple._2;
                                                for(String zipcode : zipcodes){
                                                    Node 
Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
                                                    if(Zipcode!=null){
                                                        
System.out.println("Already in Database:" + zipcode);
                                                        if(oldHMac==true && 
Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
                                                            
Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
                                                        else
                                                            
Neo4jOperations.travelTo(HMac, Zipcode);
                                                    }
                                                    else{
                                                        
Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
                                                        
Neo4jOperations.travelTo(HMac, Zipcode);
                                                    }
                                                }
                                            }
                                            tx.success();
                                        }
                                        graphDb.shutdown();
                                    }               
                        });
                        return null;
                    }
                });

The part of code in output.foreachRDD pushes the output of spark into Neo4j 
Database. Checking for duplicates values.

This part of code is very time consuming because of which my processing time 
exceeds batch time. Because of that, it result in dataloss. So, I was thinking 
of pushing the output into the database asynchronously. 
I found 
AsyncRDDActions(https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
 for this purpose, but cannot find a working example for that in Java. 
Especially, the function foreachPatitionAsync inside which we have to use 
"Function1"

Any help is appreciated.

 

Thanks,

Gautam

 





 

-- 

Gautam

 





 

-- 

Gautam





 

-- 

Gautam

 

Reply via email to