[ https://issues.apache.org/jira/browse/MAPREDUCE-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Allen Wittenauer resolved MAPREDUCE-2123. ----------------------------------------- Resolution: Not a Problem > Multiple threads per JVM > ------------------------ > > Key: MAPREDUCE-2123 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123 > Project: Hadoop Map/Reduce > Issue Type: Improvement > Reporter: Randy Wilson > > I have a process that standardizes name and place strings, and requires > access to java objects that require a lot of RAM (800MB). Hadoop (via Amazon > elastic mapreduce) was running out of memory, because it was firing up one > JVM per task per slave. Each JVM needed 1.5GB, and 6 of those blew out > memory. > In this case, we don't need 6 different JVMs running--we only need one, but > with multiple threads. I tried using a MultithreadedMapper, but it doesn't > have a thread-safe "run()" method: it makes 3 calls to the input source to > read one "line", which doesn't work if multiple threads are doing that. So I > had to override the run() method. I ended up having to do so much work to > override the run() method that it was simpler to skip using the > MultithreadedMapper at all. Instead, I took my original mapper and just > overrode the run() method there directly. I fired up n threads, each of > which called a method that had a synchronized(mutex) around the part of the > process that made the three calls to an input source to get the next line to > operate on. Then, outside of the synchronized block, it called the map() > method, which made use of the large, shared (singleton) standardization > object. > All of this made me wonder why hadoop fires up multiple JVMs per slave in the > first place--that is a lot of overhead to use per thread. I've also been > warned that doing continual reuse of JVMs instead of restarting one per task > will use up more memory. That seems like it should only be true if hadoop > (or our mapper) is leaking memory. If that's the case, let's get it fixed. > My guess is that since hadoop can run tasks in languages other than Java--and > since other languages may have less overhead per process--that firing up a > JVM per task (or per thread) simplifies hadoop. But the multithreaded > solution we did was very general-purpose. It seems like it ought to be > pretty much the default solution in java, and that a "...map.threads" > property should be all that is required to fire up that many threads to help > with each task, rather than have to jump through the hoops we had to. > Below is the implementation that seems to be working: > In the main class: > Configuration config = getConf(); > config.set("num_threads_per_jvm", Integer.toString(numThreads)); > Job job = new Job(config, "Standardize stuff"); > In the Mapper class: > public void run(final Context context) throws IOException, > InterruptedException { > int numThreads = > Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm"); > setup(context); // setup and cleanup just once, rather than once per > thread > List<MapRunner> mapRunners = new ArrayList<MapRunner>(); > for (int i = 0; i < numThreads; i++) { > MapRunner mapRunner = new MapRunner(context, i); > mapRunners.add(mapRunner); > mapRunner.start(); > } > // Wait for all the threads to complete > for (MapRunner mapRunner : mapRunners) { > mapRunner.join(); > } > cleanup(context); > } > private class MapRunner extends Thread { > final Context context; > private MapRunner(Context context) { > this.context = context; > } > @Override > public void run() { > boolean gotValue = true; > do { > try { > Text key = null; > Text value = null; > synchronized(contextMutex) { > gotValue = context.nextKeyValue(); > if (gotValue) { > key = context.getCurrentKey(); > value = context.getCurrentValue(); > } > } > if (gotValue) { > map(key, value, context); > } > } catch (Exception e) { > throw new RuntimeException(e); > } > } while (gotValue); > } > } -- This message was sent by Atlassian JIRA (v6.2#6252)