Pig always runs the same piece of code, generically speaking. There is no codegen. What actually happens is driven by serialized DAGs of the physical operators. This does seem like a bug (or an inefficiency, at least), we shouldn't need 3 mappers to do 3 filters of the same data and re-group.
D On Thu, Mar 8, 2012 at 7:51 PM, Yongzhi Wang <wang.yongzhi2...@gmail.com> wrote: > So two map tasks should running same piece of code, but reading different > input? > Or two tasks actually running different code? > Is there any way that I can track the real map reduce functions that the > pig parsed to the worker? > Or can you tell me which piece of source code in the pig project generate > the map and reduce tasks parsed to the slave worker? > > Thanks! > > > On Thu, Mar 8, 2012 at 8:23 PM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote: > >> That's what I get for reading explain plans on an iphone. Sorry. >> >> So, yeah, the cogrouping is happening as part of the shuffle. >> It seems like Pig's figuring a task per t1 and t2, (and then a logical >> union of the two, which is just to indicate that tuples from both >> relations go into the same meta-relation tagged with source, which >> will then get cogrouped). It shouldn't, it should be able to reuse the >> same scan of the source data for both t1 and t2. >> >> D >> >> On Thu, Mar 8, 2012 at 9:13 AM, Yongzhi Wang <wang.yongzhi2...@gmail.com> >> wrote: >> > Thanks, Dmitriy. I understand that there is only one job containing 2 map >> > tasks and 1 reduce tasks. But the problem is even if I only have one >> input >> > file with the size of 1.4k, (less than 50 rows of records), the stats >> data >> > still shows it needs 2 map tasks. >> > >> > The union operation is shown in the top of the Map plan tree: >> (Union[tuple] >> > - scope-85) >> > >> > #-------------------------------------------------- >> > # Map Reduce Plan >> > #-------------------------------------------------- >> > MapReduce node scope-84 >> > Map Plan >> > Union[tuple] - scope-85 >> > | >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73 >> > | | | >> > | | Project[bytearray][1] - scope-74 >> > | | >> > | |---part1: Filter[bag] - scope-59 >> > | | | >> > | | Greater Than[boolean] - scope-63 >> > | | | >> > | | |---Cast[int] - scope-61 >> > | | | | >> > | | | |---Project[bytearray][1] - scope-60 >> > | | | >> > | | |---Constant(11) - scope-62 >> > | | >> > | |---my_raw: New For Each(false,false,false)[bag] - scope-89 >> > | | | >> > | | Project[bytearray][0] - scope-86 >> > | | | >> > | | Project[bytearray][1] - scope-87 >> > | | | >> > | | Project[bytearray][2] - scope-88 >> > | | >> > | |---my_raw: >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) - >> > scope-90 >> > | >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75 >> > | | >> > | Project[bytearray][1] - scope-76 >> > | >> > |---part2: Filter[bag] - scope-66 >> > | | >> > | Less Than[boolean] - scope-70 >> > | | >> > | |---Cast[int] - scope-68 >> > | | | >> > | | |---Project[bytearray][1] - scope-67 >> > | | >> > | |---Constant(13) - scope-69 >> > | >> > |---my_raw: New For Each(false,false,false)[bag] - scope-94 >> > | | >> > | Project[bytearray][0] - scope-91 >> > | | >> > | Project[bytearray][1] - scope-92 >> > | | >> > | Project[bytearray][2] - scope-93 >> > | >> > |---my_raw: >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) - >> > scope-95-------- >> > Reduce Plan >> > result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77 >> > | >> > |---result: Package[tuple]{bytearray} - scope-72-------- >> > Global sort: false >> > >> > >> > On Thu, Mar 8, 2012 at 1:14 AM, Dmitriy Ryaboy <dvrya...@gmail.com> >> wrote: >> > >> >> You are confusing map and reduce tasks with a mapreduce jobs. Your pig >> >> script resulted in a single mapreduce job. The number of map tasks was >> 2, >> >> based on input size -- it has little to do with the actual operators you >> >> used. >> >> >> >> There is no union operator involved so I am not sure what you are >> >> referring to with that. >> >> >> >> On Mar 7, 2012, at 8:09 AM, Yongzhi Wang <wang.yongzhi2...@gmail.com> >> >> wrote: >> >> >> >> > Hi, There >> >> > >> >> > I tried to use the syntax "explain", but the MapReduce plan sometime >> >> > confused me. >> >> > >> >> > I tried such syntax below: >> >> > >> >> > *my_raw = LOAD './houred-small' USING PigStorage('\t') AS (user,hour, >> >> > query); >> >> > part1 = filter my_raw by hour>11; >> >> > part2 = filter my_raw by hour<13; >> >> > result = cogroup part1 by hour, part2 by hour; >> >> > dump result; >> >> > explain result;* >> >> > >> >> > The job stats shows as blow, indicating there are 2 Map tasks and 1 >> >> reduce >> >> > tasks. But I don't know how does the Map task is mapping to the >> MapReduce >> >> > plan shown below. It seems each Map task just do one filter and >> >> rearrange, >> >> > but on which phase the union operation is done? the shuffle phase? If >> in >> >> > that case, two Map tasks actually done different filter work. Is that >> >> > possible? Or my guess is wrong? >> >> > >> >> > So, back to the question: *Is there any way that I can see the actual >> map >> >> > and reduce task executed in the pig?* >> >> > >> >> > *Job Stats (time in seconds): >> >> > JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime >> >> > MaxReduceTime MinReduceTime AvgReduceTime Alias Feature >> Outputs >> >> > job_201203021230_0038 2 1 3 3 3 12 >> >> > 12 1 2 my_raw,part1,part2,result COGROUP >> >> > hdfs://master:54310/tmp/temp6260 >> >> > 37557/tmp-1661404166, >> >> > * >> >> > >> >> > The mapreduce plan shows as below:* >> >> > #-------------------------------------------------- >> >> > # Map Reduce Plan >> >> > #-------------------------------------------------- >> >> > MapReduce node scope-84 >> >> > Map Plan >> >> > Union[tuple] - scope-85 >> >> > | >> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73 >> >> > | | | >> >> > | | Project[bytearray][1] - scope-74 >> >> > | | >> >> > | |---part1: Filter[bag] - scope-59 >> >> > | | | >> >> > | | Greater Than[boolean] - scope-63 >> >> > | | | >> >> > | | |---Cast[int] - scope-61 >> >> > | | | | >> >> > | | | |---Project[bytearray][1] - scope-60 >> >> > | | | >> >> > | | |---Constant(11) - scope-62 >> >> > | | >> >> > | |---my_raw: New For Each(false,false,false)[bag] - scope-89 >> >> > | | | >> >> > | | Project[bytearray][0] - scope-86 >> >> > | | | >> >> > | | Project[bytearray][1] - scope-87 >> >> > | | | >> >> > | | Project[bytearray][2] - scope-88 >> >> > | | >> >> > | |---my_raw: >> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) - >> >> > scope-90 >> >> > | >> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75 >> >> > | | >> >> > | Project[bytearray][1] - scope-76 >> >> > | >> >> > |---part2: Filter[bag] - scope-66 >> >> > | | >> >> > | Less Than[boolean] - scope-70 >> >> > | | >> >> > | |---Cast[int] - scope-68 >> >> > | | | >> >> > | | |---Project[bytearray][1] - scope-67 >> >> > | | >> >> > | |---Constant(13) - scope-69 >> >> > | >> >> > |---my_raw: New For Each(false,false,false)[bag] - scope-94 >> >> > | | >> >> > | Project[bytearray][0] - scope-91 >> >> > | | >> >> > | Project[bytearray][1] - scope-92 >> >> > | | >> >> > | Project[bytearray][2] - scope-93 >> >> > | >> >> > |---my_raw: >> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) - >> >> > scope-95-------- >> >> > Reduce Plan >> >> > result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77 >> >> > | >> >> > |---result: Package[tuple]{bytearray} - scope-72-------- >> >> > Global sort: false >> >> > ----------------* >> >> > >> >> > Thanks! >> >> >>