Stuart,

We use Hadoop in parts of our ETL processing for our data warehouse. We ran into a similar problem of needing to share about 60 million key value pairs (dimension keys) amongst the mapper jobs running in the final phase of our ETL process. Our cluster is a small 3 machine 20 core system. We ended up setting up 2 nodes with memcached both set at 4GB. All 3 nodes use the cache via a java client using batch fetching (about 35 lookups per input record) and the system is performing well. It has the added benefit of being scaleable. Some of our keys can be long so we end up MD5 hashing them to work within the limits of memcached as well as consuming less space. We flush the cache every day and it takes about 12 minutes to populated it. I highly recommend it as something to explore.

--sean

On Mar 19, 2009, at 11:27 PM, Jim Twensky wrote:

Stuart,

Why do you use RMI to load your dictionary file? I presume you have (key, value) pairs and each of your mappers do numerous lookups to those pairs. In that case, using memcached may be a simpler option and again, you don't have to allocate a seperate 2 GB space for each of those 3 processes. Would this
again be a problem for you to start/stop manually? If so, you may also
consider using the distributed cache, like Aaron mentioned above. I've never tried using the distributed cache for files as large as 2GB's but it is
still worth trying since it fits in your memory.

Jim

On Thu, Mar 19, 2009 at 8:42 PM, Stuart White <stuart.whi...@gmail.com>wrote:

The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
"breathing room").

My process requires a large dictionary of terms (~ 2GB when loaded
into RAM).  The terms are looked-up very frequently, so I want the
terms memory-resident.

So, the problem is, I want 3 processes (to utilize CPU), but each
process requires ~2GB, but my nodes don't have enough memory to each
have their own copy of the 2GB of data.  So, I need to somehow share
the 2GB between the processes.

What I have currently implemented is a standalone RMI service that,
during startup, loads the 2GB dictionaries. My mappers are simply RMI
clients that call this RMI service.

This works just fine.  The only problem is that my standalone RMI
service is totally "outside" Hadoop.  I have to ssh onto each of the
nodes, start/stop/reconfigure the services manually, etc...

So, I was thinking that, at job startup, the processes on each node
would (using ZooKeeper) elect a leader responsible for hosting the 2GB dictionaries. This process would load the dictionaries and share them
via RMI.  The other processes would recognize that another process on
the box is the leader, and they would act as RMI clients to that
process.

To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
that Hadoop does not create new JVMs for each task.

Also note that the processes are "grouped" by node; that is, the
ZooKeeper path that I'll use for coordination will include the
hostname, so that only processes on the same node will compete for
leadership.

Anyway, in short, I was looking for a way to elect a leader process
per node responsible for hosting/sharing a large amount of
memory-resident data via RMI.

Hopefully that made sense...


Reply via email to