No I did not tried the partitioning below is the full code

public static  void  matchAndMerge(JavaRDD<VendorRecord>
matchRdd,JavaSparkContext jsc) throws IOException{
 long start = System.currentTimeMillis();
  JavaPairRDD<Long, MatcherReleventData> RddForMarch
=matchRdd.zipWithIndex().mapToPair(new
PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {

@Override
public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, Long> t)
throws Exception {
MatcherReleventData matcherData = new MatcherReleventData();
Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long,
MatcherReleventData>(t._2,
matcherData.convertVendorDataToMatcherData(t._1));
 return tuple;
}

}).cache();
 log.info("after index"+RddForMarch.take(1));
 Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap();
Map<Long, MatcherReleventData> matchData = new HashMap<Long,
MatcherReleventData>(tmp);
final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal =
jsc.broadcast(matchData);

JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new
Function<MatcherReleventData, Iterable<String>>(){

@Override
public Iterable<String> call(MatcherReleventData v1)
throws Exception {
List<String> values = new ArrayList<String>();
HelperUtilities helper1 = new HelperUtilities();
MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
if(matchkeys.get_companyName() !=null){
values.add(matchkeys.get_companyName());
}
if(matchkeys.get_phoneNumberr() !=null){
values.add(matchkeys.get_phoneNumberr());
}
if(matchkeys.get_zipCode() !=null){
values.add(matchkeys.get_zipCode());
}
if(matchkeys.getM_domain() !=null){
values.add(matchkeys.getM_domain());
}
  return values;
}
 });
 log.info("blocking RDD is"+blockingRdd.count());
int count=0;
log.info("Starting printing");
  for (Tuple2<Long, String> entry : blockingRdd.collect()) {

  log.info(entry._1() + ":" + entry._2());
      count++;
    }
  log.info("total count"+count);
 JavaPairRDD<Long,Integer> completeDataToprocess=blockingRdd.flatMapValues(
new Function<String, Iterable<Integer>>(){

@Override
public Iterable<Integer> call(String v1) throws Exception {
return ckdao.getSingelkeyresult(v1);
}
 }).distinct(32);
 log.info("after hbase count is"+completeDataToprocess.count());
 log.info("data for process"+completeDataToprocess.take(1));
 JavaPairRDD<Long, Tuple2<Integer, Double>> withScore
=completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>,
Long, Tuple2<Integer, Double>>(){

@Override
public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> t)
throws Exception {
Scoring scoreObj = new Scoring();
double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
dataMatchGlobal.getValue().get(t._1()));
Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(),
score);
Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long,
Tuple2<Integer,Double>>(t._1(), maptuple);
return tuple;
}
 });
 log.info("with score tuple is"+withScore.take(1));
 JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD
=withScore.reduceByKey( new Function2<Tuple2<Integer,Double>,
Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){

@Override
public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1,
Tuple2<Integer, Double> v2) throws Exception {
 int res =v1._2().compareTo(v2._2());
if(res >0){
 Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(),
v1._2());
return result;
 }
else if(res<0){
Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
v2._2());
return result;
}
else{
Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
v2._2());
return result;
}
  }
 });
 log.info("max score RDD"+maxScoreRDD.take(10));

 maxScoreRDD.foreach( new
VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){

@Override
public void call(Tuple2<Long, Tuple2<Integer, Double>> t)
throws Exception {
MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1()));
//Set the score for better understanding of merge
matchedData.setScore(t._2()._2());
vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id");
 }
 });
 log.info("took " + (System.currentTimeMillis() - start) + " mills to run
matcher");



 }


On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> wrote:

> Can you paste your complete code? Did you try repartioning/increasing
> level of parallelism to speed up the processing. Since you have 16 cores,
> and I'm assuming your 400k records isn't bigger than a 10G dataset.
>
> Thanks
> Best Regards
>
> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <gangele...@gmail.com>
> wrote:
>
>> I already checked and G is taking 1 secs for each task. is this too much?
>> if yes how to avoid this?
>>
>>
>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>>
>>> Open the driver ui and see which stage is taking time, you can look
>>> whether its adding any GC time etc.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <gangele...@gmail.com
>>> > wrote:
>>>
>>>> Hi All I have below code whether distinct is running for more time.
>>>>
>>>> blockingRdd is the combination of <Long,String> and it will have 400K
>>>> records
>>>> JavaPairRDD<Long,Integer>
>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>> Iterable<Integer>>(){
>>>>
>>>> @Override
>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>> return ckdao.getSingelkeyresult(v1);
>>>> }
>>>>  }).distinct(32);
>>>>
>>>> I am running distinct on 800K records and its taking 2 hours on 16
>>>> cores and 20 GB RAM.
>>>>
>>>
>>>
>>
>>
>>
>>
>

Reply via email to