No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDD<VendorRecord> matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDD<Long, MatcherReleventData> RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {
@Override public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long, MatcherReleventData>(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info("after index"+RddForMarch.take(1)); Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap(); Map<Long, MatcherReleventData> matchData = new HashMap<Long, MatcherReleventData>(tmp); final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new Function<MatcherReleventData, Iterable<String>>(){ @Override public Iterable<String> call(MatcherReleventData v1) throws Exception { List<String> values = new ArrayList<String>(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info("blocking RDD is"+blockingRdd.count()); int count=0; log.info("Starting printing"); for (Tuple2<Long, String> entry : blockingRdd.collect()) { log.info(entry._1() + ":" + entry._2()); count++; } log.info("total count"+count); JavaPairRDD<Long,Integer> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, Iterable<Integer>>(){ @Override public Iterable<Integer> call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info("after hbase count is"+completeDataToprocess.count()); log.info("data for process"+completeDataToprocess.take(1)); JavaPairRDD<Long, Tuple2<Integer, Double>> withScore =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>, Long, Tuple2<Integer, Double>>(){ @Override public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(), score); Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long, Tuple2<Integer,Double>>(t._1(), maptuple); return tuple; } }); log.info("with score tuple is"+withScore.take(1)); JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>, Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){ @Override public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1, Tuple2<Integer, Double> v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res >0){ Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(), v1._2()); return result; } else if(res<0){ Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), v2._2()); return result; } else{ Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), v2._2()); return result; } } }); log.info("max score RDD"+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){ @Override public void call(Tuple2<Long, Tuple2<Integer, Double>> t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); } }); log.info("took " + (System.currentTimeMillis() - start) + " mills to run matcher"); } On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Can you paste your complete code? Did you try repartioning/increasing > level of parallelism to speed up the processing. Since you have 16 cores, > and I'm assuming your 400k records isn't bigger than a 10G dataset. > > Thanks > Best Regards > > On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <gangele...@gmail.com> > wrote: > >> I already checked and G is taking 1 secs for each task. is this too much? >> if yes how to avoid this? >> >> >> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> wrote: >> >>> Open the driver ui and see which stage is taking time, you can look >>> whether its adding any GC time etc. >>> >>> Thanks >>> Best Regards >>> >>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <gangele...@gmail.com >>> > wrote: >>> >>>> Hi All I have below code whether distinct is running for more time. >>>> >>>> blockingRdd is the combination of <Long,String> and it will have 400K >>>> records >>>> JavaPairRDD<Long,Integer> >>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>> Iterable<Integer>>(){ >>>> >>>> @Override >>>> public Iterable<Integer> call(String v1) throws Exception { >>>> return ckdao.getSingelkeyresult(v1); >>>> } >>>> }).distinct(32); >>>> >>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>> cores and 20 GB RAM. >>>> >>> >>> >> >> >> >> >