In general the issue we run into with join isn't the number of records in either table but the number of instances of a given key. Pig's join is written such that it materializes the keys of one input in memory and then streams through the keys of the second input. If it is unable to materialize all of the instances of the keys in memory it tries to spill those to disk, but that does not always succeed (as you've discovered). So there are a couple of things to try.

1) Reverse the order of your tables in your join statement. Pig always streams the keys of the last input, so if one of your inputs has less instances of of a given key this may help.

2) Reduce the number of maps and reducers per machine and give it all the memory you can.

Alan.

On Feb 18, 2009, at 11:23 AM, Tamir Kamara wrote:

Hi,

I've verified I'm using the latest version from the svn.
The first job (the join) still fails in the reduce section due to memory
problems even with 512mb memory.
I should also point out that the traffic1 (after the filter) is 1.5 billion records and the other file is about 300,000 records. The final outcome of
this whole job should be 50,000 after grouping by the sld.
The default reduce tasks per job is set to 6 and What I saw is that even with parallel 12 for the join, only 3 reducers really work hard while the others finishes very quickly with no problem. And after a while those 3 are
failing due to memory problems.
Is this asymmetry between the reducers topical ?
Will higher parallel help with my issue ?


Thanks,
Tamir


On Wed, Feb 18, 2009 at 12:14 AM, Tamir Kamara <[email protected]>wrote:

It's the first MR job.

On most machines I've only 4GB where I set the java memory to 256. With 6 maps and 2 reducers on each machine I think there's no memory to spair (datanode and tasktracker are coming up with 1024m each - which is a default
i don't understand yet).
But this specific error i got in a machine with 32GB ram and on which each task got 640m so this isn't enough it doesn't seem that upping the memory is
the way to go for my cluster.
By the way, in the other machines I get a different memory error: GC
overhead limit exceeded.
Do you think I should lower the map/reduce tasks to 3/1 and from the freed
up memory double each ones to 512m?

I forgot to attach the log before, it's attached now..

I've checked out pig 2 weeks ago following the guide in the wiki.
How do I find what version it is ?



On Tue, Feb 17, 2009 at 11:49 PM, Alan Gates <[email protected]> wrote:

Is it the join or group by that is running out of memory? You can tell by whether it is the first or second map reduce job that is having problems.

How much memory do your grid machines have? If you can up the memory that
will help.

What version of pig are you running?  The top of trunk code has some
changes that process a nested distinct in the combiner, which should prevent
you from running out of memory there.

Alan.


On Feb 17, 2009, at 1:30 PM, Tamir Kamara wrote:

Thanks Alan. That is indeed better.

But now I'm getting stuck by memory problems. I think the reducers are
out
of heap memory. The log I attached is from a machine that runs 2 reducers
simultaneously with Xmx640m, io.sort.factor 50 and io.sort.mb 200.
I think the reducers works ok until it starts making a lot of:
SpillableMemoryManager: low memory handler called

How can I resolve this issue ?



On Tue, Feb 17, 2009 at 6:43 PM, Alan Gates <[email protected]> wrote:

A couple of pointers:

Group bys where you do a foreach/generate immediately after that
contains
no UDF accomplish nothing other than reorganizing your data, so you can
drop
those.

To accomplish a distinct count, use distinct nested in a foreach.

So your script should look like:

traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long, w:int,
e:int, o:int);
traffic1 = FOREACH traffic GENERATE domain, subnet;

subnet_info = LOAD 'subnet_info.txt' AS (subnet:long, country:chararray,
sld:chararray, org:chararray);
us_subnets = FILTER subnet_info BY country eq 'us';
us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;

jr = JOIN traffic1 BY subnet, us_subnets1 by subnet;

r0 = FOREACH jr GENERATE sld, domain;

r3 = GROUP r0 BY domain;
r4 = FOREACH r3 {
    r5 = r0.domain;
    r6 = distinct r5;
    GENERATE group, COUNT(r6) as domains;
}

store r4 into 'sld-domains-count';

Alan.

On Feb 16, 2009, at 11:36 PM, Tamir Kamara wrote:

Hi,


I have the following query where i want to generate (sld, count of
distinct
domains).
The traffic data comes with domain, subnet and the sld is obtained by a
second file (with a join).
I had a problem with generating this in a simple fashion and especially
with
the distinct domains part. Would you have a look on the script below
and
help me figure out if there's a way to simplify this ?

Thanks,
Tamir

traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long, w:int,
e:int, o:int);
traffic1 = FOREACH traffic GENERATE domain, subnet;

traffic_by_subnet = GROUP traffic1 BY subnet;
traffic_by_subnet1 = FOREACH traffic_by_subnet GENERATE group AS
subnet,
traffic1.domain;

subnet_info = LOAD 'subnet_info.txt' AS (subnet:long,
country:chararray,
sld:chararray, org:chararray);
us_subnets = FILTER subnet_info BY country eq 'us';
us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;

jr = JOIN traffic_by_subnet1 BY subnet, us_subnets1 by subnet;

r0 = FOREACH jr GENERATE sld, domain;
r1 = GROUP r0 BY sld;
r2 = FOREACH r1 GENERATE group as sld, flatten(r0.domain) as domain;
r3 = GROUP r2 BY domain;
r4 = FOREACH r3 GENERATE r2.sld, COUNT(group) as domains;

store r4 into 'sld-domains-count';







Reply via email to