In general the issue we run into with join isn't the number of records
in either table but the number of instances of a given key. Pig's
join is written such that it materializes the keys of one input in
memory and then streams through the keys of the second input. If it
is unable to materialize all of the instances of the keys in memory it
tries to spill those to disk, but that does not always succeed (as
you've discovered). So there are a couple of things to try.
1) Reverse the order of your tables in your join statement. Pig
always streams the keys of the last input, so if one of your inputs
has less instances of of a given key this may help.
2) Reduce the number of maps and reducers per machine and give it all
the memory you can.
Alan.
On Feb 18, 2009, at 11:23 AM, Tamir Kamara wrote:
Hi,
I've verified I'm using the latest version from the svn.
The first job (the join) still fails in the reduce section due to
memory
problems even with 512mb memory.
I should also point out that the traffic1 (after the filter) is 1.5
billion
records and the other file is about 300,000 records. The final
outcome of
this whole job should be 50,000 after grouping by the sld.
The default reduce tasks per job is set to 6 and What I saw is that
even
with parallel 12 for the join, only 3 reducers really work hard
while the
others finishes very quickly with no problem. And after a while
those 3 are
failing due to memory problems.
Is this asymmetry between the reducers topical ?
Will higher parallel help with my issue ?
Thanks,
Tamir
On Wed, Feb 18, 2009 at 12:14 AM, Tamir Kamara
<[email protected]>wrote:
It's the first MR job.
On most machines I've only 4GB where I set the java memory to 256.
With 6
maps and 2 reducers on each machine I think there's no memory to
spair
(datanode and tasktracker are coming up with 1024m each - which is
a default
i don't understand yet).
But this specific error i got in a machine with 32GB ram and on
which each
task got 640m so this isn't enough it doesn't seem that upping the
memory is
the way to go for my cluster.
By the way, in the other machines I get a different memory error: GC
overhead limit exceeded.
Do you think I should lower the map/reduce tasks to 3/1 and from
the freed
up memory double each ones to 512m?
I forgot to attach the log before, it's attached now..
I've checked out pig 2 weeks ago following the guide in the wiki.
How do I find what version it is ?
On Tue, Feb 17, 2009 at 11:49 PM, Alan Gates <[email protected]>
wrote:
Is it the join or group by that is running out of memory? You can
tell by
whether it is the first or second map reduce job that is having
problems.
How much memory do your grid machines have? If you can up the
memory that
will help.
What version of pig are you running? The top of trunk code has some
changes that process a nested distinct in the combiner, which
should prevent
you from running out of memory there.
Alan.
On Feb 17, 2009, at 1:30 PM, Tamir Kamara wrote:
Thanks Alan. That is indeed better.
But now I'm getting stuck by memory problems. I think the
reducers are
out
of heap memory. The log I attached is from a machine that runs 2
reducers
simultaneously with Xmx640m, io.sort.factor 50 and io.sort.mb 200.
I think the reducers works ok until it starts making a lot of:
SpillableMemoryManager: low memory handler called
How can I resolve this issue ?
On Tue, Feb 17, 2009 at 6:43 PM, Alan Gates <[email protected]>
wrote:
A couple of pointers:
Group bys where you do a foreach/generate immediately after that
contains
no UDF accomplish nothing other than reorganizing your data, so
you can
drop
those.
To accomplish a distinct count, use distinct nested in a foreach.
So your script should look like:
traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long,
w:int,
e:int, o:int);
traffic1 = FOREACH traffic GENERATE domain, subnet;
subnet_info = LOAD 'subnet_info.txt' AS (subnet:long,
country:chararray,
sld:chararray, org:chararray);
us_subnets = FILTER subnet_info BY country eq 'us';
us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;
jr = JOIN traffic1 BY subnet, us_subnets1 by subnet;
r0 = FOREACH jr GENERATE sld, domain;
r3 = GROUP r0 BY domain;
r4 = FOREACH r3 {
r5 = r0.domain;
r6 = distinct r5;
GENERATE group, COUNT(r6) as domains;
}
store r4 into 'sld-domains-count';
Alan.
On Feb 16, 2009, at 11:36 PM, Tamir Kamara wrote:
Hi,
I have the following query where i want to generate (sld, count
of
distinct
domains).
The traffic data comes with domain, subnet and the sld is
obtained by a
second file (with a join).
I had a problem with generating this in a simple fashion and
especially
with
the distinct domains part. Would you have a look on the script
below
and
help me figure out if there's a way to simplify this ?
Thanks,
Tamir
traffic = LOAD 'traffic.txt' AS (domain:chararray, subnet:long,
w:int,
e:int, o:int);
traffic1 = FOREACH traffic GENERATE domain, subnet;
traffic_by_subnet = GROUP traffic1 BY subnet;
traffic_by_subnet1 = FOREACH traffic_by_subnet GENERATE group AS
subnet,
traffic1.domain;
subnet_info = LOAD 'subnet_info.txt' AS (subnet:long,
country:chararray,
sld:chararray, org:chararray);
us_subnets = FILTER subnet_info BY country eq 'us';
us_subnets1 = FOREACH us_subnets GENERATE subnet, sld;
jr = JOIN traffic_by_subnet1 BY subnet, us_subnets1 by subnet;
r0 = FOREACH jr GENERATE sld, domain;
r1 = GROUP r0 BY sld;
r2 = FOREACH r1 GENERATE group as sld, flatten(r0.domain) as
domain;
r3 = GROUP r2 BY domain;
r4 = FOREACH r3 GENERATE r2.sld, COUNT(group) as domains;
store r4 into 'sld-domains-count';