Here is the (theoretical) rule of thumb for replicated join : for replicated join to perform significantly better than default join, the size of the replicated input should be smaller than the block size ( or pig.maxCombinedSplitSize if property pig.splitCombination=true and larger than block size).
This is because for the number of map tasks started are equal to the number of blocks (or size/pig.maxCombinedSplitSize) in the left side input of replicated join. Each of these blocks will read the replicated input. If the replicated input read size is few times larger than block size, using replicated join will not save on IO/(de)serialization costs. -Thejas On 4/18/11 4:33 PM, "Thejas M Nair" <[email protected]> wrote: For default join (hash join) - - Increasing the parallelism of the default join should speed it up. - Put the table which has large number of tuples per key as the last table in join . (Yes, this happens to be the opposite of the recommendation for replicated join !) See - http://pig.apache.org/docs/r0.8.0/cookbook.html#Take+Advantage+of+Join+Optim izations - http://pig.apache.org/docs/r0.8.0/cookbook.html#Project+Early+and+Often For replicated join - - I believe the reason why replicated join is performing worse that default join is because of the large number of maps and the large size of the replicated file. Each map task ends up reading and deserializing the replicated file( obs_relation.txt), and usually that takes bulk of the runtime. In this case (691MB x 266 (maps) =~) 183GB of replicated input data will be read and deserialized by all the map tasks. This is actually very small compared to size of the larger input (17GB). To reduce the number of maps, you can use the feature introduced in https://issues.apache.org/jira/browse/PIG-1518 , ensure that you have the property pig.splitCombination=true, and pig.maxCombinedSplitSize=X, where X = size_of_obr_pm_annotation.txt/number-of-map-slots . This will ensure that all cluster slots are used and you don't have too many map tasks. -Thejas On 4/17/11 6:03 AM, "byambajargal" <[email protected]> wrote: > Hello ... > I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU, > ! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig > Join queries which are a Parallel and a Replicated version of pig Join. > > Theoretically Replicated Join could be faster than Parallel join but in > my case Parallel is faster. > I am wondering why the replicated join is so slowly. i wont to improve > the performance of both query. Could you check the detail of the queries. > > thanks > > Byambajargal > > > ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using > PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load > '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS > (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by > concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO > ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C > > HadoopVersion PigVersion UserId StartedAt FinishedAt > Features > 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:31:36 > 2011-04-15 10:43:22 > HASH_JOIN,GROU P_BY > > Success! > > Job Stats (time in seconds): > JobId Maps Reduces > MaxMapTime MinMapTIme AvgMapTime MaxReduceTime > MinReduceTime AvgReduceTime Alias Feature Outputs > job_201103122121_0084 277 10 15 > 5 11 417 > 351 379 ANNO,ISA_ANNO, > REL HASH_JOIN > job_201103122121_0085 631 1 10 > 5 7 242 > 242 242 ISA_ANNO_C,ISA_ANNO_T > GROUP_BY,COMBINER > hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868, > > Input(s): > Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt" > Successfully read 442049697 records from: > "/datastorm/task3/obr_pm_annotation.txt" > > Output(s): > Successfully stored 1 records (14 bytes) in: > "hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868" > > Counters: > Total records written : 1 > Total bytes written : 14 > Spillable Memory Manager spill count : 0 > Total bags proactively spilled: 41 > Total records proactively spilled: 8781684 > > Job DAG: > job_201103122121_0084 -> job_201103122121_0085, > job_201103122121_0085 > > > 2011-04-15 10:43:22,403 [main] INFO > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR > educeLauncher - Success! > 2011-04-15 10:43:22,419 [main] INFO > org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total > inp ut paths to > process : 1 > 2011-04-15 10:43:22,419 [main] INFO > org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - > T otal input paths > to process : 1 > (844872046) > > > *Using replicated version* > *ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using > PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load > '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS > (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by > concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP > ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump > ISA_ANNO_C* > ** > HadoopVersion PigVersion UserId StartedAt FinishedAt > Features > 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:57:37 > 2011-04-15 11:26:32 > REPLICATED_JOI > N,GROUP_BY > > Success! > > Job Stats (time in seconds): > JobId Maps Reduces MaxMapTime > MinMapTIme AvgMapTime MaxReduceTime MinReduceTime > AvgReduceTime Alias Feature Outputs > job_201103122121_0088 11 0 11 5 > 9 0 0 > 0 REL MAP_ON LY > job_201103122121_0089 266 1 151 101 > 123 1566 1566 > 1566 ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T > REPLICATED_JOIN,GROUP_BY,COMBINER > hdfs://haisen11:54310/tmp/temp-1729753 > 626/tmp-61569771, > > Input(s): > Successfully read 442049697 records (17809735666 bytes) from: > "/datastorm/task3/obr_pm_annotation.txt" > Successfully read 24153638 records (691022731 bytes) from: > "/datastorm/task3/obs_relation.txt" > > Output(s): > Successfully stored 1 records (14 bytes) in: > "hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771" > > Counters: > Total records written : 1 > Total bytes written : 14 > Spillable Memory Manager spill count : 0 > Total bags proactively spilled: 0 > Total records proactively spilled: 0 > > Job DAG: > job_201103122121_0088 -> job_201103122121_0089, > job_201103122121_0089 > > > 2011-04-15 11:26:32,751 [main] INFO > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR > educeLauncher - Success! > 2011-04-15 11:26:32,889 [main] INFO > org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total > inp ut paths to > process : 1 > 2011-04-15 11:26:32,899 [main] INFO > org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - > T otal input paths > to process : 1 > (844872046) > > * > * > * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using > PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load > '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS > (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by > concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel'; > * > > HadoopVersion PigVersion UserId StartedAt FinishedAt > Features > 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:08:52 > 2011-04-15 16:16:26 HASH_JOIN > > Success! > > Job Stats (time in seconds): > JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime > MaxReduceTime MinReduceTime > AvgReduc eTime Alias > Feature Outputs > job_201103122121_0090 277 10 15 6 11 432 > 353 394 ANNO,ISA_ANNO,REL > H ASH_JOIN > hdfs://haisen11:54310/user/haisen/outputdel, > > Input(s): > Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt" > Successfully read 442049697 records from: > "/datastorm/task3/obr_pm_annotation.txt" > > Output(s): > Successfully stored 844872046 records (34500196186 bytes) in: > "hdfs://haisen11:54310/user/haisen/outputdel" > > Counters: > Total records written : 844872046 > Total bytes written : 34500196186 > Spillable Memory Manager spill count : 0 > Total bags proactively spilled: 41 > Total records proactively spilled: 8537764 > > Job DAG: > job_201103122121_0090 > > 2011-04-15 16:16:26,320 [main] INFO > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc > her - Success! > > > * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using > PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load > '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS > (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by > concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into > 'outputdel';* > > > HadoopVersion PigVersion UserId StartedAt FinishedAt > Features > 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:32:20 > 2011-04-15 17:02:16 REPLICATED_JOIN > > Success! > > Job Stats (time in seconds): > JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime > MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs > job_201103122121_0093 11 0 10 5 9 0 > 0 0 REL MAP_ONLY > job_201103122121_0094 266 0 156 96 128 0 > 0 0 ANNO,ISA_ANNO REPLICATED_JOIN,MAP_ONLY > hdfs://haisen11:54310/user/haisen/outputdel1, > > Input(s): > Successfully read 24153638 records (691022731 bytes) from: > "/datastorm/task3/obs_relation.txt" > Successfully read 442049697 records (17809735666 bytes) from: > "/datastorm/task3/obr_pm_annotation.txt" > > Output(s): > Successfully stored 844872046 records (34500196186 bytes) in: > "hdfs://haisen11:54310/user/haisen/outputdel1" > > Counters: > Total records written : 844872046 > Total bytes written : 34500196186 > Spillable Memory Manager spill count : 0 > Total bags proactively spilled: 0 > Total records proactively spilled: 0 > > Job DAG: > job_201103122121_0093 -> job_201103122121_0094, > job_201103122121_0094 > > > 2011-04-15 17:02:16,651 [main] INFO > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher > - Success! > > > > > > > > -- --
