Tamar, When PIG-646 was fixed, we did not see this behaviour. Can you file a JIRA and provide a representative script that will produce this error? If you can add more information regarding the size of your inputs, etc., it will aid us in reproducing the error.
Thanks, Santhosh -----Original Message----- From: Tamir Kamara [mailto:[email protected]] Sent: Saturday, February 21, 2009 2:38 AM To: [email protected] Subject: Re: Query Help Hey, I also seem to be having many map tasks being killed because no progress is reported. I think this is due to the DISTINCT UDF which in my case can have tens of millions of tuples to go through. I'm seeing numerous errors like this one in the task logs: 2009-02-21 11:41:57,727 WARN org.apache.pig.builtin.Distinct$Intermediate: No reporter object provided to UDF org.apache.pig.builtin.Distinct$Intermediate Later the task could be killed because it failed to report status for 600 seconds. It looks like this is a problem which has been resolved a few weeks ago with this: https://issues.apache.org/jira/browse/PIG-646 and I'm working with the latest trunk. Is this really the same issue or is it something else? Thanks, Tamir On Fri, Feb 20, 2009 at 8:14 AM, Tamir Kamara <[email protected]> wrote: > Thanks Alan! > > I've tried to switch the files in the join statement and now the first pig > job responsible for the join succeeded. however, the second pig job fail > during its map phase soon after in starts because too many map tasks fail/\/ > the error I'm getting for almost all tasks is Spill failed (more details > below). What does this mean when it happens in my map tasks in the second > pig job? > > Thanks in advance, > Tamir > > java.io.IOException: Spill failed > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.ja va:589) > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.ja va:570) > > at java.io.DataOutputStream.writeBoolean(Unknown Source) > at org.apache.pig.impl.io.PigNullableWritable.write(PigNullableWritable.jav a:82) > at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer .serialize(WritableSerialization.java:90) > > at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer .serialize(WritableSerialization.java:77) > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:43 1) > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduc e$Map.collect(PigMapReduce.java:100) > > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase. runPipeline(PigMapBase.java:205) > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase. map(PigMapBase.java:194) > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduc e$Map.map(PigMapReduce.java:85) > > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227) > at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198) > Caused by: java.lang.OutOfMemoryError: Java heap space > > at java.util.HashMap.resize(Unknown Source) > at java.util.HashMap.addEntry(Unknown Source) > at java.util.HashMap.put(Unknown Source) > at java.util.HashSet.add(Unknown Source) > at org.apache.pig.data.DistinctDataBag.add(DistinctDataBag.java:104) > > at org.apache.pig.builtin.Distinct.getDistinctFromNestedBags(Distinct.java: 127) > at org.apache.pig.builtin.Distinct.access$200(Distinct.java:39) > at org.apache.pig.builtin.Distinct$Intermediate.exec(Distinct.java:102) > > at org.apache.pig.builtin.Distinct$Intermediate.exec(Distinct.java:95) > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOp erators.POUserFunc.getNext(POUserFunc.java:187) > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOp erators.POUserFunc.getNext(POUserFunc.java:221) > > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp erators.POForEach.processPlan(POForEach.java:248) > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp erators.POForEach.getNext(POForEach.java:198) > > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOper ator.processInput(PhysicalOperator.java:226) > at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp erators.POLocalRearrange.getNext(POLocalRearrange.java:200) > > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner $Combine.processOnePackageOutput(PigCombiner.java:173) > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner $Combine.reduce(PigCombiner.java:151) > > at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner $Combine.reduce(PigCombiner.java:58) > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.combineAndSpill(MapTask .java:904) > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja va:785) > > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1600(MapTask.jav a:286) > at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask .java:712) > > > > > On Thu, Feb 19, 2009 at 7:23 PM, Alan Gates <[email protected]> wrote: > >> In general the issue we run into with join isn't the number of records in >> either table but the number of instances of a given key. Pig's join is >> written such that it materializes the keys of one input in memory and then >> streams through the keys of the second input. If it is unable to >> materialize all of the instances of the keys in memory it tries to spill >> those to disk, but that does not always succeed (as you've discovered). So >> there are a couple of things to try. >> >> 1) Reverse the order of your tables in your join statement. Pig always >> streams the keys of the last input, so if one of your inputs has less >> instances of of a given key this may help. >> >> 2) Reduce the number of maps and reducers per machine and give it all the >> memory you can. >> >> Alan. >> >> >> On Feb 18, 2009, at 11:23 AM, Tamir Kamara wrote: >> >> Hi, >>> >>> I've verified I'm using the latest version from the svn. >>> The first job (the join) still fails in the reduce section due to memory >>> problems even with 512mb memory. >>> I should also point out that the traffic1 (after the filter) is 1.5 >>> billion >>> records and the other file is about 300,000 records. The final outcome of >>> this whole job should be 50,000 after grouping by the sld. >>> The default reduce tasks per job is set to 6 and What I saw is that even >>> with parallel 12 for the join, only 3 reducers really work hard while the >>> others finishes very quickly with no problem. And after a while those 3 >>> are >>> failing due to memory problems. >>> Is this asymmetry between the reducers topical ? >>> Will higher parallel help with my issue ? >>> >>> >>> Thanks, >>> Tamir >>> >>> >>> On Wed, Feb 18, 2009 at 12:14 AM, Tamir Kamara <[email protected] >>> >wrote: >>> >>> It's the first MR job. >>>> >>>> On most machines I've only 4GB where I set the java memory to 256. With >>>> 6 >>>> maps and 2 reducers on each machine I think there's no memory to spair >>>> (datanode and tasktracker are coming up with 1024m each - which is a >>>> default >>>> i don't understand yet). >>>> But this specific error i got in a machine with 32GB ram and on which >>>> each >>>> task got 640m so this isn't enough it doesn't seem that upping the >>>> memory is >>>> the way to go for my cluster. >>>> By the way, in the other machines I get a different memory error: GC >>>> overhead limit exceeded. >>>> Do you think I should lower the map/reduce tasks to 3/1 and from the >>>> freed >>>> up memory double each ones to 512m? >>>> >>>> I forgot to attach the log before, it's attached now.. >>>> >>>> I've checked out pig 2 weeks ago following the guide in the wiki. >>>> How do I find what version it is ? >>>> >>>> >>>> >>>> On Tue, Feb 17, 2009 at 11:49 PM, Alan Gates <[email protected]> >>>> wrote: >>>> >>>> Is it the join or group by that is running out of memory? You can tell >>>>> by >>>>> whether it is the first or second map reduce job that is having >>>>> problems. >>>>> >>>>> How much memory do your grid machines have? If you can up the memory >>>>> that >>>>> will help. >>>>> >>>>> What version of pig are you running? The top of trunk code has some >>>>> changes that process a nested distinct in the combiner, which should >>>>> prevent >>>>> you from running out of memory there. >>>>> >>>>> Alan. >>>>> >>>>> >>>>> On Feb 17, 2009, at 1:30 PM, Tamir Kamara wrote: >>>>> >>>>> Thanks Alan. That is indeed better. >>>>> >>>>>> >>>>>> But now I'm getting stuck by memory problems. I think the reducers are >>>>>> out >>>>>> of heap memory. The log I attached is from a machine that runs 2 >>>>>> reducers >>>>>> simultaneously with Xmx640m, io.sort.factor 50 and io.sort.mb 200. >>>>>> I think the reducers works ok until it starts making a lot of: >>>>>> SpillableMemoryManager: low memory handler called >>>>>> >>>>>> How can I resolve this issue ? >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 17, 2009 at 6:43 PM, Alan Gates <[email protected]> >>>>>> wrote: >>>>>> >>>>>> A couple of pointers: >>>>>> >>>>>>> >>>>>>> Group bys where you do a foreach/generate immediately after that >>>>>>> contains >>>>>>> no UDF accomplish nothing other than reorganizing your data, so you >>>>>>> can >>>>>>> drop >>>>>>> those. >>>>>>> >>>>>>> To accomplish a distinct count, use distinct nested in a foreach. >>>>>>> >>>>>>> So your script should look like: >>>>>>> >>>>>>> traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long, >>>>>>> w:int, >>>>>>> e:int, o:int); >>>>>>> traffic1 = FOREACH traffic GENERATE domain, subnet; >>>>>>> >>>>>>> subnet_info = LOAD 'subnet_info.txt' AS (subnet:long, >>>>>>> country:chararray, >>>>>>> sld:chararray, org:chararray); >>>>>>> us_subnets = FILTER subnet_info BY country eq 'us'; >>>>>>> us_subnets1 = FOREACH us_subnets GENERATE subnet, sld; >>>>>>> >>>>>>> jr = JOIN traffic1 BY subnet, us_subnets1 by subnet; >>>>>>> >>>>>>> r0 = FOREACH jr GENERATE sld, domain; >>>>>>> >>>>>>> r3 = GROUP r0 BY domain; >>>>>>> r4 = FOREACH r3 { >>>>>>> r5 = r0.domain; >>>>>>> r6 = distinct r5; >>>>>>> GENERATE group, COUNT(r6) as domains; >>>>>>> } >>>>>>> >>>>>>> store r4 into 'sld-domains-count'; >>>>>>> >>>>>>> Alan. >>>>>>> >>>>>>> On Feb 16, 2009, at 11:36 PM, Tamir Kamara wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>>> I have the following query where i want to generate (sld, count of >>>>>>>> distinct >>>>>>>> domains). >>>>>>>> The traffic data comes with domain, subnet and the sld is obtained >>>>>>>> by a >>>>>>>> second file (with a join). >>>>>>>> I had a problem with generating this in a simple fashion and >>>>>>>> especially >>>>>>>> with >>>>>>>> the distinct domains part. Would you have a look on the script below >>>>>>>> and >>>>>>>> help me figure out if there's a way to simplify this ? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Tamir >>>>>>>> >>>>>>>> traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long, >>>>>>>> w:int, >>>>>>>> e:int, o:int); >>>>>>>> traffic1 = FOREACH traffic GENERATE domain, subnet; >>>>>>>> >>>>>>>> traffic_by_subnet = GROUP traffic1 BY subnet; >>>>>>>> traffic_by_subnet1 = FOREACH traffic_by_subnet GENERATE group AS >>>>>>>> subnet, >>>>>>>> traffic1.domain; >>>>>>>> >>>>>>>> subnet_info = LOAD 'subnet_info.txt' AS (subnet:long, >>>>>>>> country:chararray, >>>>>>>> sld:chararray, org:chararray); >>>>>>>> us_subnets = FILTER subnet_info BY country eq 'us'; >>>>>>>> us_subnets1 = FOREACH us_subnets GENERATE subnet, sld; >>>>>>>> >>>>>>>> jr = JOIN traffic_by_subnet1 BY subnet, us_subnets1 by subnet; >>>>>>>> >>>>>>>> r0 = FOREACH jr GENERATE sld, domain; >>>>>>>> r1 = GROUP r0 BY sld; >>>>>>>> r2 = FOREACH r1 GENERATE group as sld, flatten(r0.domain) as domain; >>>>>>>> r3 = GROUP r2 BY domain; >>>>>>>> r4 = FOREACH r3 GENERATE r2.sld, COUNT(group) as domains; >>>>>>>> >>>>>>>> store r4 into 'sld-domains-count'; >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >> >
