Re: Distinct is very slow
Anyone any thought on this? On 22 April 2015 at 22:49, Jeetendra Gangele gangele...@gmail.com wrote: I made 7000 tasks in mapTopair and in distinct also I made same number of tasks. Still lots of shuffle read and write is happening due to application running for much longer time. Any idea? On 17 April 2015 at 11:55, Akhil Das ak...@sigmoidanalytics.com wrote: How many tasks are you seeing in your mapToPair stage? Is it 7000? then i suggest you giving a number similar/close to 7000 in your .distinct call, what is happening in your case is that, you are repartitioning your data to a smaller number (32) which would put a lot of load on processing i believe, you can try increasing it. Thanks Best Regards On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com wrote: Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2());
Re: Distinct is very slow
How many tasks are you seeing in your mapToPair stage? Is it 7000? then i suggest you giving a number similar/close to 7000 in your .distinct call, what is happening in your case is that, you are repartitioning your data to a smaller number (32) which would put a lot of load on processing i believe, you can try increasing it. Thanks Best Regards On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com wrote: Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm
Re: Distinct is very slow
I am saying to partition something like partitionBy(new HashPartitioner(16) will this not work? On 17 April 2015 at 21:28, Jeetendra Gangele gangele...@gmail.com wrote: I have given 3000 task to mapToPair now its taking so much memory and shuffling and wasting time there. Here is the stats when I run with very small data almost for all data its doing shuffling not sure what is happening here any idea? - *Total task time across all tasks: *11.0 h - *Shuffle read: *153.8 MB - *Shuffle write: *288.0 MB On 17 April 2015 at 14:32, Jeetendra Gangele gangele...@gmail.com wrote: mapToPair is running with 32 tasks but very slow because lot of shuffles read. attaching screen shot each task is running from 10 mins. even Though Inside function i m not doing anything costly.
Re: Distinct is very slow
Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Distinct is very slow
Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Distinct is very slow
I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Distinct is very slow
No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination
Re: Distinct is very slow
Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding
Re: Distinct is very slow
at distinct level I will have 7000 times more elements in my RDD.So should I re partition? because its parent will definitely have less partition how to see through java code number of partition? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid