Stuart,
We use Hadoop in parts of our ETL processing for our data warehouse.
We ran into a similar problem of needing to share about 60 million key
value pairs (dimension keys) amongst the mapper jobs running in the
final phase of our ETL process. Our cluster is a small 3 machine 20
core system. We ended up setting up 2 nodes with memcached both set
at 4GB. All 3 nodes use the cache via a java client using batch
fetching (about 35 lookups per input record) and the system is
performing well. It has the added benefit of being scaleable. Some
of our keys can be long so we end up MD5 hashing them to work within
the limits of memcached as well as consuming less space. We flush the
cache every day and it takes about 12 minutes to populated it. I
highly recommend it as something to explore.
--sean
On Mar 19, 2009, at 11:27 PM, Jim Twensky wrote:
Stuart,
Why do you use RMI to load your dictionary file? I presume you have
(key,
value) pairs and each of your mappers do numerous lookups to those
pairs. In
that case, using memcached may be a simpler option and again, you
don't have
to allocate a seperate 2 GB space for each of those 3 processes.
Would this
again be a problem for you to start/stop manually? If so, you may also
consider using the distributed cache, like Aaron mentioned above.
I've never
tried using the distributed cache for files as large as 2GB's but it
is
still worth trying since it fits in your memory.
Jim
On Thu, Mar 19, 2009 at 8:42 PM, Stuart White
<stuart.whi...@gmail.com>wrote:
The nodes in my cluster have 4 cores & 4 GB RAM. So, I've set
mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
"breathing room").
My process requires a large dictionary of terms (~ 2GB when loaded
into RAM). The terms are looked-up very frequently, so I want the
terms memory-resident.
So, the problem is, I want 3 processes (to utilize CPU), but each
process requires ~2GB, but my nodes don't have enough memory to each
have their own copy of the 2GB of data. So, I need to somehow share
the 2GB between the processes.
What I have currently implemented is a standalone RMI service that,
during startup, loads the 2GB dictionaries. My mappers are simply
RMI
clients that call this RMI service.
This works just fine. The only problem is that my standalone RMI
service is totally "outside" Hadoop. I have to ssh onto each of the
nodes, start/stop/reconfigure the services manually, etc...
So, I was thinking that, at job startup, the processes on each node
would (using ZooKeeper) elect a leader responsible for hosting the
2GB
dictionaries. This process would load the dictionaries and share
them
via RMI. The other processes would recognize that another process on
the box is the leader, and they would act as RMI clients to that
process.
To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
that Hadoop does not create new JVMs for each task.
Also note that the processes are "grouped" by node; that is, the
ZooKeeper path that I'll use for coordination will include the
hostname, so that only processes on the same node will compete for
leadership.
Anyway, in short, I was looking for a way to elect a leader process
per node responsible for hosting/sharing a large amount of
memory-resident data via RMI.
Hopefully that made sense...