Hi Harsh, here's the simplest example I could come up with: Add protected void setup(Context context) throws IOException ,InterruptedException { // start some non-deamon thread Thread t = new Thread(new Runnable() { public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); t.setDaemon(false); t.start(); System.err.println("Started thread in reduce setup"); }; to the Reduce inner class in the wordcount sample (source code attached). Assuming its in wordcount.jar and files have been uploaded for counting (no matter what content of course), running hadoop jar wordcount.jar
org.myorg.WordCount wordcount/input wordcount/result
gives me, reproducibly, a hanging "Child" process. Interestingly, that does not happen when starting a thread like above but in Map.setup. One more note: In our case, some non-trivial infrastructure is started and used in map, combine, and reduce. I believe it could be shutdown and started again between map and reduce when run in the same JVM. That is however expensive and brings no benefit otherwise. If there would be a way to know that now the JVM will really not be used anymore, that would be a good time to really cleanup. Unfortunately shutdown hooks don't work here as they will not be run before non-daemon threads have stopped. Thanks, Henning On 10/27/2011 01:18 PM, Henning Blohm wrote: Hi Harsh, that would be 0.20.3. Will try to prepare a stripped down sample later today or tomorrow.Thanks, Henning On 10/27/2011 12:55 PM, Harsh J wrote: > Hey Henning, > > What version of Hadoop are you running, and can we have a dumbed down > sample to reproduce? > > On Thu, Oct 27, 2011 at 3:28 PM, Henning Blohm<henning.bl...@zfabrik.de> wrote: >> Hi, >> >> found that several people have run into this issue, but I was not able to >> find a solution yet. >> >> We have reduce tasks that leave a hanging "child" process. The >> implementation uses a lot of third party stuff and leave Timer threads >> running (as you can readily see in thread dumps). Which is bad style - no >> doubt. But eventually we don't really care - when the reduce is done, its >> done and the process should be really just killed rather than hanging around >> and eventually impacting the cluster. >> >> Is there a way to force killing of child processes, e.g. based on job >> configuration? >> >> Thanks, >> Henning >> > -- *Henning Blohm* *ZFabrik Software KG* T: +49/62278399955 F: +49/62278399956 M: +49/1781891820 Bunsenstrasse 1 69190 Walldorf henning.bl...@zfabrik.de <mailto:henning.bl...@zfabrik.de> Linkedin <http://de.linkedin.com/pub/henning-blohm/0/7b5/628> www.zfabrik.de <http://www.zfabrik.de> www.z2-environment.eu <http://www.z2-environment.eu> |
package org.myorg; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { protected void setup(Context context) throws IOException ,InterruptedException { // start some non-deamon thread Thread t = new Thread(new Runnable() { public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); t.setDaemon(false); t.start(); System.err.println("Started thread in reduce setup"); }; public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }