RE: Storing spark processed output to Database asynchronously.
… and measure 4 is to implement a custom Feedback Loop to e.g.to monitor the amount of free RAM and number of queued jobs and automatically decrease the message consumption rate of the Receiver until the number of clogged RDDs and Jobs subsides (again here you artificially decrease your performance in the name of the reliability/integrity of your system ie not loosing messages) From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 9:39 PM To: 'Tathagata Das'; 'Gautam Bajaj' Cc: 'user' Subject: RE: Storing spark processed output to Database asynchronously. If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das wrote: If you cannot push data as fast as you are gener
RE: Storing spark processed output to Database asynchronously.
If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The "work" is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj wrote: Hi, >From my understanding of Spark Streaming, I created a spark entry point, for >continuous UDP data, using: SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf,
Re: Storing spark processed output to Database asynchronously.
Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj wrote: > This is just a friendly ping, just to remind you of my query. > > Also, is there a possible explanation/example on the usage of > AsyncRDDActions in Java ? > > On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj > wrote: > >> I am received data at UDP port 8060 and doing processing on it using >> Spark and storing the output in Neo4j. >> >> But the data I'm receiving and the data that is getting stored doesn't >> match probably because Neo4j API takes too long to push the data into >> database. Meanwhile, Spark is unable to receive data probably because the >> process is blocked. >> >> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das >> wrote: >> >>> Can you elaborate on how the data loss is occurring? >>> >>> >>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj >>> wrote: >>> That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das wrote: > If you cannot push data as fast as you are generating it, then async > isnt going to help either. The "work" is just going to keep piling up as > many many async jobs even though your batch processing times will be low > as > that processing time is not going to reflect how much of overall work is > pending in the system. > > On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj > wrote: > >> Hi, >> >> From my understanding of Spark Streaming, I created a spark entry >> point, for continuous UDP data, using: >> >> SparkConf conf = new >> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext >> jssc = new JavaStreamingContext(conf, new >> Duration(1));JavaReceiverInputDStream lines = >> jssc.receiverStream(new CustomReceiver(8060)); >> >> Now, when I process this input stream using: >> >> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= >> hash.mapToPair()JavaPairDStream output= >> tuple.reduceByKey() >> output.foreachRDD( >> new >> Function2>,Time,Void>(){ >> @Override >> public Void call( >> JavaPairRDD> arg0, >> Time arg1) throws Exception { >> // TODO Auto-generated method stub >> new AsyncRDDActions(arg0.rdd(), null); >> arg0.foreachPartition( >> new >> VoidFunction>>>(){ >> >> @Override >> public void call( >> Iterator> ArrayList>> arg0) >> throws Exception { >> >> // TODO Auto-generated method >> stub >> GraphDatabaseService graphDb = >> new >> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") >> >> .setConfig("remote_shell_enabled", "true") >> .newGraphDatabase(); >> >> try (Transaction tx = >> graphDb.beginTx()) { >> while (arg0.hasNext()) { >> Tuple2 < String, >> ArrayList < String >> tuple = arg0.next(); >> Node >> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); >> boolean oldHMac=false; >> if (HMac!= null){ >>
Re: Storing spark processed output to Database asynchronously.
This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj wrote: > I am received data at UDP port 8060 and doing processing on it using Spark > and storing the output in Neo4j. > > But the data I'm receiving and the data that is getting stored doesn't > match probably because Neo4j API takes too long to push the data into > database. Meanwhile, Spark is unable to receive data probably because the > process is blocked. > > On Thu, May 21, 2015 at 5:28 PM, Tathagata Das > wrote: > >> Can you elaborate on how the data loss is occurring? >> >> >> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj >> wrote: >> >>> That is completely alright, as the system will make sure the works get >>> done. >>> >>> My major concern is, the data drop. Will using async stop data loss? >>> >>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das >>> wrote: >>> If you cannot push data as fast as you are generating it, then async isnt going to help either. The "work" is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj wrote: > Hi, > > From my understanding of Spark Streaming, I created a spark entry > point, for continuous UDP data, using: > > SparkConf conf = new > SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext > jssc = new JavaStreamingContext(conf, new > Duration(1));JavaReceiverInputDStream lines = > jssc.receiverStream(new CustomReceiver(8060)); > > Now, when I process this input stream using: > > JavaDStream hash=lines.flatMap()JavaPairDStream tuple= > hash.mapToPair()JavaPairDStream output= > tuple.reduceByKey() > output.foreachRDD( > new > Function2>,Time,Void>(){ > @Override > public Void call( > JavaPairRDD> arg0, > Time arg1) throws Exception { > // TODO Auto-generated method stub > new AsyncRDDActions(arg0.rdd(), null); > arg0.foreachPartition( > new > VoidFunction>>>(){ > > @Override > public void call( > Iterator ArrayList>> arg0) > throws Exception { > > // TODO Auto-generated method stub > GraphDatabaseService graphDb = > new > GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") > > .setConfig("remote_shell_enabled", "true") > .newGraphDatabase(); > > try (Transaction tx = > graphDb.beginTx()) { > while (arg0.hasNext()) { > Tuple2 < String, > ArrayList < String >> tuple = arg0.next(); > Node > HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); > boolean oldHMac=false; > if (HMac!= null){ > > System.out.println("Alread in Database:" + tuple._1); > oldHMac=true; > } > else > > HMac=Neo4jOperations.createHMac(graphDb, tuple._1); > > ArrayList > zipcodes=tuple._2; > for(String zipcode : > zipcodes){ > Node > Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); > if(Zipcode!=null){ > > System.out.println("Already in Database:" + zipcode); > if(oldHMac==true > && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) >
Re: Storing spark processed output to Database asynchronously.
Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj wrote: > That is completely alright, as the system will make sure the works get > done. > > My major concern is, the data drop. Will using async stop data loss? > > On Thu, May 21, 2015 at 4:55 PM, Tathagata Das > wrote: > >> If you cannot push data as fast as you are generating it, then async isnt >> going to help either. The "work" is just going to keep piling up as many >> many async jobs even though your batch processing times will be low as that >> processing time is not going to reflect how much of overall work is pending >> in the system. >> >> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj >> wrote: >> >>> Hi, >>> >>> From my understanding of Spark Streaming, I created a spark entry point, >>> for continuous UDP data, using: >>> >>> SparkConf conf = new >>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext >>> jssc = new JavaStreamingContext(conf, new >>> Duration(1));JavaReceiverInputDStream lines = >>> jssc.receiverStream(new CustomReceiver(8060)); >>> >>> Now, when I process this input stream using: >>> >>> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= >>> hash.mapToPair()JavaPairDStream output= >>> tuple.reduceByKey() >>> output.foreachRDD( >>> new >>> Function2>,Time,Void>(){ >>> @Override >>> public Void call( >>> JavaPairRDD> arg0, >>> Time arg1) throws Exception { >>> // TODO Auto-generated method stub >>> new AsyncRDDActions(arg0.rdd(), null); >>> arg0.foreachPartition( >>> new >>> VoidFunction>>>(){ >>> >>> @Override >>> public void call( >>> Iterator>> ArrayList>> arg0) >>> throws Exception { >>> >>> // TODO Auto-generated method stub >>> GraphDatabaseService graphDb = new >>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") >>> >>> .setConfig("remote_shell_enabled", "true") >>> .newGraphDatabase(); >>> >>> try (Transaction tx = >>> graphDb.beginTx()) { >>> while (arg0.hasNext()) { >>> Tuple2 < String, ArrayList >>> < String >> tuple = arg0.next(); >>> Node >>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); >>> boolean oldHMac=false; >>> if (HMac!= null){ >>> >>> System.out.println("Alread in Database:" + tuple._1); >>> oldHMac=true; >>> } >>> else >>> >>> HMac=Neo4jOperations.createHMac(graphDb, tuple._1); >>> >>> ArrayList >>> zipcodes=tuple._2; >>> for(String zipcode : >>> zipcodes){ >>> Node >>> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); >>> if(Zipcode!=null){ >>> >>> System.out.println("Already in Database:" + zipcode); >>> if(oldHMac==true && >>> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) >>> >>> Neo4jOperations.updateToCurrentTime(HMac, Zipcode); >>> else >>> >>> Neo4jOperations.travelTo(HMac, Zipcode); >>> } >>> else{ >>> >>> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); >>> >>> Neo4jOperations.travelTo(HMac, Zipcode); >>> } >>> } >>> } >>> tx.success(); >>>
Re: Storing spark processed output to Database asynchronously.
That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das wrote: > If you cannot push data as fast as you are generating it, then async isnt > going to help either. The "work" is just going to keep piling up as many > many async jobs even though your batch processing times will be low as that > processing time is not going to reflect how much of overall work is pending > in the system. > > On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj > wrote: > >> Hi, >> >> From my understanding of Spark Streaming, I created a spark entry point, >> for continuous UDP data, using: >> >> SparkConf conf = new >> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext >> jssc = new JavaStreamingContext(conf, new >> Duration(1));JavaReceiverInputDStream lines = >> jssc.receiverStream(new CustomReceiver(8060)); >> >> Now, when I process this input stream using: >> >> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= >> hash.mapToPair()JavaPairDStream output= tuple.reduceByKey() >> output.foreachRDD( >> new >> Function2>,Time,Void>(){ >> @Override >> public Void call( >> JavaPairRDD> arg0, >> Time arg1) throws Exception { >> // TODO Auto-generated method stub >> new AsyncRDDActions(arg0.rdd(), null); >> arg0.foreachPartition( >> new >> VoidFunction>>>(){ >> >> @Override >> public void call( >> Iterator> ArrayList>> arg0) >> throws Exception { >> >> // TODO Auto-generated method stub >> GraphDatabaseService graphDb = new >> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") >> >> .setConfig("remote_shell_enabled", "true") >> .newGraphDatabase(); >> >> try (Transaction tx = >> graphDb.beginTx()) { >> while (arg0.hasNext()) { >> Tuple2 < String, ArrayList < >> String >> tuple = arg0.next(); >> Node >> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); >> boolean oldHMac=false; >> if (HMac!= null){ >> >> System.out.println("Alread in Database:" + tuple._1); >> oldHMac=true; >> } >> else >> >> HMac=Neo4jOperations.createHMac(graphDb, tuple._1); >> >> ArrayList >> zipcodes=tuple._2; >> for(String zipcode : >> zipcodes){ >> Node >> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); >> if(Zipcode!=null){ >> >> System.out.println("Already in Database:" + zipcode); >> if(oldHMac==true && >> Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) >> >> Neo4jOperations.updateToCurrentTime(HMac, Zipcode); >> else >> >> Neo4jOperations.travelTo(HMac, Zipcode); >> } >> else{ >> >> Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); >> >> Neo4jOperations.travelTo(HMac, Zipcode); >> } >> } >> } >> tx.success(); >> } >> graphDb.shutdown(); >> } >> }); >> return null; >> } >>
Re: Storing spark processed output to Database asynchronously.
If you cannot push data as fast as you are generating it, then async isnt going to help either. The "work" is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj wrote: > Hi, > > From my understanding of Spark Streaming, I created a spark entry point, > for continuous UDP data, using: > > SparkConf conf = new > SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext > jssc = new JavaStreamingContext(conf, new > Duration(1));JavaReceiverInputDStream lines = > jssc.receiverStream(new CustomReceiver(8060)); > > Now, when I process this input stream using: > > JavaDStream hash=lines.flatMap()JavaPairDStream tuple= > hash.mapToPair()JavaPairDStream output= tuple.reduceByKey() > output.foreachRDD( > new > Function2>,Time,Void>(){ > @Override > public Void call( > JavaPairRDD> arg0, > Time arg1) throws Exception { > // TODO Auto-generated method stub > new AsyncRDDActions(arg0.rdd(), null); > arg0.foreachPartition( > new > VoidFunction>>>(){ > > @Override > public void call( > Iterator ArrayList>> arg0) > throws Exception { > > // TODO Auto-generated method stub > GraphDatabaseService graphDb = new > GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/") > > .setConfig("remote_shell_enabled", "true") > .newGraphDatabase(); > > try (Transaction tx = > graphDb.beginTx()) { > while (arg0.hasNext()) { > Tuple2 < String, ArrayList < > String >> tuple = arg0.next(); > Node > HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); > boolean oldHMac=false; > if (HMac!= null){ > > System.out.println("Alread in Database:" + tuple._1); > oldHMac=true; > } > else > > HMac=Neo4jOperations.createHMac(graphDb, tuple._1); > > ArrayList > zipcodes=tuple._2; > for(String zipcode : > zipcodes){ > Node > Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); > if(Zipcode!=null){ > > System.out.println("Already in Database:" + zipcode); > if(oldHMac==true && > Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) > > Neo4jOperations.updateToCurrentTime(HMac, Zipcode); > else > > Neo4jOperations.travelTo(HMac, Zipcode); > } > else{ > > Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); > > Neo4jOperations.travelTo(HMac, Zipcode); > } > } > } > tx.success(); > } > graphDb.shutdown(); > } > }); > return null; > } > }); > > The part of code in output.foreachRDD pushes the output of spark into > Neo4j Database. Checking for duplicates values. > > This part of code is very time consuming because of which my processing > time exceeds batch time. Because of that, it *result in dataloss*. So, I > was thinking