[ 
https://issues.apache.org/jira/browse/MAPREDUCE-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Allen Wittenauer resolved MAPREDUCE-2123.
-----------------------------------------

    Resolution: Not a Problem

> Multiple threads per JVM
> ------------------------
>
>                 Key: MAPREDUCE-2123
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>            Reporter: Randy Wilson
>
> I have a process that standardizes name and place strings, and requires 
> access to java objects that require a lot of RAM (800MB).  Hadoop (via Amazon 
> elastic mapreduce) was running out of memory, because it was firing up one 
> JVM per task per slave.  Each JVM needed 1.5GB, and 6 of those blew out 
> memory.
> In this case, we don't need 6 different JVMs running--we only need one, but 
> with multiple threads.  I tried using a MultithreadedMapper, but it doesn't 
> have a thread-safe "run()" method: it makes 3 calls to the input source to 
> read one "line", which doesn't work if multiple threads are doing that.  So I 
> had to override the run() method.  I ended up having to do so much work to 
> override the run() method that it was simpler to skip using the 
> MultithreadedMapper at all.  Instead, I took my original mapper and just 
> overrode the run() method there directly.  I fired up n threads, each of 
> which called a method that had a synchronized(mutex) around the part of the 
> process that made the three calls to an input source to get the next line to 
> operate on.  Then, outside of the synchronized block, it called the map() 
> method, which made use of the large, shared (singleton) standardization 
> object.
> All of this made me wonder why hadoop fires up multiple JVMs per slave in the 
> first place--that is a lot of overhead to use per thread.   I've also been 
> warned that doing continual reuse of JVMs instead of restarting one per task 
> will use up more memory.  That seems like it should only be true if hadoop 
> (or our mapper) is leaking memory.  If that's the case, let's get it fixed.
> My guess is that since hadoop can run tasks in languages other than Java--and 
> since other languages may have less overhead per process--that firing up a 
> JVM per task (or per thread) simplifies hadoop.  But the multithreaded 
> solution we did was very general-purpose.  It seems like it ought to be 
> pretty much the default solution in java, and that a "...map.threads" 
> property should be all that is required to fire up that many threads to help 
> with each task, rather than have to jump through the hoops we had to.
> Below is the implementation that seems to be working:
> In the main class:
>     Configuration config = getConf();
>     config.set("num_threads_per_jvm", Integer.toString(numThreads));
>     Job job = new Job(config, "Standardize stuff");
> In the Mapper class:
>   public void run(final Context context) throws IOException, 
> InterruptedException {
>     int numThreads = 
> Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm");
>     setup(context); // setup and cleanup just once, rather than once per 
> thread
>     List<MapRunner> mapRunners = new ArrayList<MapRunner>();
>     for (int i = 0; i < numThreads; i++) {
>       MapRunner mapRunner = new MapRunner(context, i);
>       mapRunners.add(mapRunner);
>       mapRunner.start();
>     }
>      // Wait for all the threads to complete
>     for (MapRunner mapRunner : mapRunners) {
>       mapRunner.join();
>     }
>     cleanup(context);
>   }
>   private class MapRunner extends Thread {
>     final Context context;
>     private MapRunner(Context context) {
>       this.context = context;
>     }
>     @Override
>     public void run() {
>       boolean gotValue = true;
>       do {
>         try {
>           Text key = null;
>           Text value = null;
>           synchronized(contextMutex) {
>             gotValue = context.nextKeyValue();
>             if (gotValue) {
>               key = context.getCurrentKey();
>               value = context.getCurrentValue();
>             }
>           }
>           if (gotValue) {
>             map(key, value, context);
>           }
>         } catch (Exception e) {
>           throw new RuntimeException(e);
>         }
>       } while (gotValue);
>     }
>   }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to