Tamar,

When PIG-646 was fixed, we did not see this behaviour. Can you file a
JIRA and provide a representative script that will produce this error?
If you can add more information regarding the size of your inputs, etc.,
it will aid us in reproducing the error.

Thanks,
Santhosh 

-----Original Message-----
From: Tamir Kamara [mailto:[email protected]] 
Sent: Saturday, February 21, 2009 2:38 AM
To: [email protected]
Subject: Re: Query Help

Hey,

I also seem to be having many map tasks being killed because no progress
is
reported. I think this is due to the DISTINCT UDF which in my case can
have
tens of millions of tuples to go through.

I'm seeing numerous errors like this one in the task logs:
2009-02-21 11:41:57,727 WARN
org.apache.pig.builtin.Distinct$Intermediate:
No reporter object provided to UDF
org.apache.pig.builtin.Distinct$Intermediate
Later the task could be killed because it failed to report status for
600
seconds.

It looks like this is a problem which has been resolved a few weeks ago
with
this:
https://issues.apache.org/jira/browse/PIG-646
and I'm working with the latest trunk.

Is this really the same issue or is it something else?

Thanks,
Tamir


On Fri, Feb 20, 2009 at 8:14 AM, Tamir Kamara <[email protected]>
wrote:

> Thanks Alan!
>
> I've tried to switch the files in the join statement and now the first
pig
> job responsible for the join succeeded. however, the second pig job
fail
> during its map phase soon after in starts because too many map tasks
fail/\/
> the error I'm getting for almost all tasks is Spill failed (more
details
> below). What does this mean when it happens in my map tasks in the
second
> pig job?
>
> Thanks in advance,
> Tamir
>
> java.io.IOException: Spill failed
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.ja
va:589)
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.ja
va:570)
>
>       at java.io.DataOutputStream.writeBoolean(Unknown Source)
>       at
org.apache.pig.impl.io.PigNullableWritable.write(PigNullableWritable.jav
a:82)
>       at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer
.serialize(WritableSerialization.java:90)
>
>       at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer
.serialize(WritableSerialization.java:77)
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:43
1)
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduc
e$Map.collect(PigMapReduce.java:100)
>
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.
runPipeline(PigMapBase.java:205)
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.
map(PigMapBase.java:194)
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduc
e$Map.map(PigMapReduce.java:85)
>
>       at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47)
>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
>       at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>
>       at java.util.HashMap.resize(Unknown Source)
>       at java.util.HashMap.addEntry(Unknown Source)
>       at java.util.HashMap.put(Unknown Source)
>       at java.util.HashSet.add(Unknown Source)
>       at
org.apache.pig.data.DistinctDataBag.add(DistinctDataBag.java:104)
>
>       at
org.apache.pig.builtin.Distinct.getDistinctFromNestedBags(Distinct.java:
127)
>       at org.apache.pig.builtin.Distinct.access$200(Distinct.java:39)
>       at
org.apache.pig.builtin.Distinct$Intermediate.exec(Distinct.java:102)
>
>       at
org.apache.pig.builtin.Distinct$Intermediate.exec(Distinct.java:95)
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOp
erators.POUserFunc.getNext(POUserFunc.java:187)
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOp
erators.POUserFunc.getNext(POUserFunc.java:221)
>
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
erators.POForEach.processPlan(POForEach.java:248)
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
erators.POForEach.getNext(POForEach.java:198)
>
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOper
ator.processInput(PhysicalOperator.java:226)
>       at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
erators.POLocalRearrange.getNext(POLocalRearrange.java:200)
>
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
$Combine.processOnePackageOutput(PigCombiner.java:173)
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
$Combine.reduce(PigCombiner.java:151)
>
>       at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
$Combine.reduce(PigCombiner.java:58)
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.combineAndSpill(MapTask
.java:904)
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
va:785)
>
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1600(MapTask.jav
a:286)
>       at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask
.java:712)
>
>
>
>
> On Thu, Feb 19, 2009 at 7:23 PM, Alan Gates <[email protected]>
wrote:
>
>> In general the issue we run into with join isn't the number of
records in
>> either table but the number of instances of a given key.  Pig's join
is
>> written such that it materializes the keys of one input in memory and
then
>> streams through the keys of the second input.  If it is unable to
>> materialize all of the instances of the keys in memory it tries to
spill
>> those to disk, but that does not always succeed (as you've
discovered).  So
>> there are a couple of things to try.
>>
>> 1) Reverse the order of your tables in your join statement.  Pig
always
>> streams the keys of the last input, so if one of your inputs has less
>> instances of of a given key this may help.
>>
>> 2) Reduce the number of maps and reducers per machine and give it all
the
>> memory you can.
>>
>> Alan.
>>
>>
>> On Feb 18, 2009, at 11:23 AM, Tamir Kamara wrote:
>>
>>  Hi,
>>>
>>> I've verified I'm using the latest version from the svn.
>>> The first job (the join) still fails in the reduce section due to
memory
>>> problems even with 512mb memory.
>>> I should also point out that the traffic1 (after the filter) is 1.5
>>> billion
>>> records and the other file is about 300,000 records. The final
outcome of
>>> this whole job should be 50,000 after grouping by the sld.
>>> The default reduce tasks per job is set to 6 and What I saw is that
even
>>> with parallel 12 for the join, only 3 reducers really work hard
while the
>>> others finishes very quickly with no problem. And after a while
those 3
>>> are
>>> failing due to memory problems.
>>> Is this asymmetry between the reducers topical ?
>>> Will higher parallel help with my issue ?
>>>
>>>
>>> Thanks,
>>> Tamir
>>>
>>>
>>> On Wed, Feb 18, 2009 at 12:14 AM, Tamir Kamara
<[email protected]
>>> >wrote:
>>>
>>>  It's the first MR job.
>>>>
>>>> On most machines I've only 4GB where I set the java memory to 256.
With
>>>> 6
>>>> maps and 2 reducers on each machine I think there's no memory to
spair
>>>> (datanode and tasktracker are coming up with 1024m each - which is
a
>>>> default
>>>> i don't understand yet).
>>>> But this specific error i got in a machine with 32GB ram and on
which
>>>> each
>>>> task got 640m so this isn't enough it doesn't seem that upping the
>>>> memory is
>>>> the way to go for my cluster.
>>>> By the way, in the other machines I get a different memory error:
GC
>>>> overhead limit exceeded.
>>>> Do you think I should lower the map/reduce tasks to 3/1 and from
the
>>>> freed
>>>> up memory double each ones to 512m?
>>>>
>>>> I forgot to attach the log before, it's attached now..
>>>>
>>>> I've checked out pig 2 weeks ago following the guide in the wiki.
>>>> How do I find what version it is ?
>>>>
>>>>
>>>>
>>>> On Tue, Feb 17, 2009 at 11:49 PM, Alan Gates <[email protected]>
>>>> wrote:
>>>>
>>>>  Is it the join or group by that is running out of memory?  You can
tell
>>>>> by
>>>>> whether it is the first or second map reduce job that is having
>>>>> problems.
>>>>>
>>>>> How much memory do your grid machines have?  If you can up the
memory
>>>>> that
>>>>> will help.
>>>>>
>>>>> What version of pig are you running?  The top of trunk code has
some
>>>>> changes that process a nested distinct in the combiner, which
should
>>>>> prevent
>>>>> you from running out of memory there.
>>>>>
>>>>> Alan.
>>>>>
>>>>>
>>>>> On Feb 17, 2009, at 1:30 PM, Tamir Kamara wrote:
>>>>>
>>>>> Thanks Alan. That is indeed better.
>>>>>
>>>>>>
>>>>>> But now I'm getting stuck by memory problems. I think the
reducers are
>>>>>> out
>>>>>> of heap memory. The log I attached is from a machine that runs 2
>>>>>> reducers
>>>>>> simultaneously with Xmx640m, io.sort.factor 50 and io.sort.mb
200.
>>>>>> I think the reducers works ok until it starts making a lot of:
>>>>>> SpillableMemoryManager: low memory handler called
>>>>>>
>>>>>> How can I resolve this issue ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 17, 2009 at 6:43 PM, Alan Gates <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> A couple of pointers:
>>>>>>
>>>>>>>
>>>>>>> Group bys where you do a foreach/generate immediately after that
>>>>>>> contains
>>>>>>> no UDF accomplish nothing other than reorganizing your data, so
you
>>>>>>> can
>>>>>>> drop
>>>>>>> those.
>>>>>>>
>>>>>>> To accomplish a distinct count, use distinct nested in a
foreach.
>>>>>>>
>>>>>>> So your script should look like:
>>>>>>>
>>>>>>> traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long,
>>>>>>> w:int,
>>>>>>> e:int, o:int);
>>>>>>> traffic1 = FOREACH traffic GENERATE domain, subnet;
>>>>>>>
>>>>>>> subnet_info = LOAD 'subnet_info.txt' AS (subnet:long,
>>>>>>> country:chararray,
>>>>>>> sld:chararray, org:chararray);
>>>>>>> us_subnets = FILTER subnet_info BY country eq 'us';
>>>>>>> us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;
>>>>>>>
>>>>>>> jr = JOIN traffic1 BY subnet, us_subnets1 by subnet;
>>>>>>>
>>>>>>> r0 = FOREACH jr GENERATE sld, domain;
>>>>>>>
>>>>>>> r3 = GROUP r0 BY domain;
>>>>>>> r4 = FOREACH r3 {
>>>>>>>    r5 = r0.domain;
>>>>>>>    r6 = distinct r5;
>>>>>>>    GENERATE group, COUNT(r6) as domains;
>>>>>>> }
>>>>>>>
>>>>>>> store r4 into 'sld-domains-count';
>>>>>>>
>>>>>>> Alan.
>>>>>>>
>>>>>>> On Feb 16, 2009, at 11:36 PM, Tamir Kamara wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>> I have the following query where i want to generate (sld, count
of
>>>>>>>> distinct
>>>>>>>> domains).
>>>>>>>> The traffic data comes with domain, subnet and the sld is
obtained
>>>>>>>> by a
>>>>>>>> second file (with a join).
>>>>>>>> I had a problem with generating this in a simple fashion and
>>>>>>>> especially
>>>>>>>> with
>>>>>>>> the distinct domains part. Would you have a look on the script
below
>>>>>>>> and
>>>>>>>> help me figure out if there's a way to simplify this ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Tamir
>>>>>>>>
>>>>>>>> traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long,
>>>>>>>> w:int,
>>>>>>>> e:int, o:int);
>>>>>>>> traffic1 = FOREACH traffic GENERATE domain, subnet;
>>>>>>>>
>>>>>>>> traffic_by_subnet = GROUP traffic1 BY subnet;
>>>>>>>> traffic_by_subnet1 = FOREACH traffic_by_subnet GENERATE group
AS
>>>>>>>> subnet,
>>>>>>>> traffic1.domain;
>>>>>>>>
>>>>>>>> subnet_info = LOAD 'subnet_info.txt' AS (subnet:long,
>>>>>>>> country:chararray,
>>>>>>>> sld:chararray, org:chararray);
>>>>>>>> us_subnets = FILTER subnet_info BY country eq 'us';
>>>>>>>> us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;
>>>>>>>>
>>>>>>>> jr = JOIN traffic_by_subnet1 BY subnet, us_subnets1 by subnet;
>>>>>>>>
>>>>>>>> r0 = FOREACH jr GENERATE sld, domain;
>>>>>>>> r1 = GROUP r0 BY sld;
>>>>>>>> r2 = FOREACH r1 GENERATE group as sld, flatten(r0.domain) as
domain;
>>>>>>>> r3 = GROUP r2 BY domain;
>>>>>>>> r4 = FOREACH r3 GENERATE r2.sld, COUNT(group) as domains;
>>>>>>>>
>>>>>>>> store r4 into 'sld-domains-count';
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>

Reply via email to