Hi Steve, I think I was able to reproduce your problem over the weekend
(not sure though, it may be a different problem).  In my case it was that
the mappers were timing out during the merge phase.  I also think the
related tickets are
MAPREDUCE-2177<https://issues.apache.org/jira/browse/MAPREDUCE-2177>and
MAPREDUCE-2187 <https://issues.apache.org/jira/browse/MAPREDUCE-2187>.  In
my case I oversubscribed the cluster a bit with respect to the # of
map/reduce slots.

In general, this is quite unusual workload as every byte in the original
dataset generates 100x of output very fast.  This workload requires a
special tuning:

undersubscribe the nodes with respect to # of mappers/reducers (say, use
only 1/2 of the # of spindles for each *mapred.tasktracker.map.tasks.maximum
* and *mapred.tasktracker.reduce.tasks.maximum*)
increase *mapred.reduce.slowstart.completed.maps* (set too ~0.95 so that
reducers do not interfere with working mappers)
reduce *mapred.merge.recordsBeforeProgress* (set to 100, the default is
10000)
reduce *mapred.combine.recordsBeforeProgress* (set to 100, the default is
10000)
decrease the *dfs.block.size* for the input file so that each mapper
handles less data
increase the # of reducers so that each reducer handles less data
increase *io.sort.mb* and child memory to decrease the # of spills

Hope this helps.  Let me know.

--
Alex K
<http://www.cloudera.com/company/press-center/hadoop-world-nyc/>

On Fri, Jan 20, 2012 at 2:23 PM, Steve Lewis <lordjoe2...@gmail.com> wrote:

> Interesting - I strongly suspect a disk IO or network problem since my code
> is very simple and very fast.
> If you  add lines to  generateSubStrings to limit String length to 100
> characters (I think it is always that but this makes su
>
> public static String[] generateSubStrings(String inp, int minLength, int
> maxLength) {
>            // guarantee no more than 100 characters
>            if(inp.length() > 100)
>                      inp = inp.substring(0,100);
>             List<String> holder = new ArrayList<String>();
>            for (int start = 0; start < inp.length() - minLength; start++) {
>                for (int end = start + minLength; end <
> Math.min(inp.length(), start + maxLength); end++) {
>                    try {
>                        holder.add(inp.substring(start, end));
>                    }
>                    catch (Exception e) {
>                        throw new RuntimeException(e);
>
>                    }
>                }
>            }
>
> On Fri, Jan 20, 2012 at 12:41 PM, Alex Kozlov <ale...@cloudera.com> wrote:
>
> > Hi Steve, I ran your job on our cluster and it does not timeout.  I
> noticed
> > that each mapper runs for a long time: one way to avoid a timeout is to
> > update a user counter.  As long as this counter is updated within 10
> > minutes, the task should not timeout (as MR knows that something is being
> > done).  Normally an output bytes counter would be updated, but if the job
> > is stuck somewhere doing something it will timeout.  I agree that there
> > might be a disk IO or network problem that causes a long wait, but
> without
> > detailed logs it's hard to tell.
> >
> > On the side note the SubstringCount class should extend Configured.
> >
> > --
> > Alex K
> > <http://www.cloudera.com/company/press-center/hadoop-world-nyc/>
> >
> > On Fri, Jan 20, 2012 at 12:18 PM, Michel Segel <
> michael_se...@hotmail.com
> > >wrote:
> >
> > > Steve,
> > > If you want me to debug your code, I'll be glad to set up a billable
> > > contract... ;-)
> > >
> > > What I am willing to do is to help you to debug your code...
> > >
> > > Did you time how long it takes in the Mapper.map() method?
> > > The reason I asked this is to first confirm that you are failing
> within a
> > > map() method.
> > > It could be that you're just not updating your status...
> > >
> > > You said that you are writing many output records for a single input.
> > >
> > > So let's take a look at your code.
> > > Are all writes of the same length? Meaning that in each iteration of
> > > Mapper.map() you will always write. K number of rows?
> > >
> > > If so, ask yourself why some iterations are taking longer and longer?
> > >
> > > Note: I'm assuming that the time for each iteration is taking longer
> than
> > > the previous...
> > >
> > > Or am I missing something?
> > >
> > > -Mike
> > >
> > > Sent from a remote device. Please excuse any typos...
> > >
> > > Mike Segel
> > >
> > > On Jan 20, 2012, at 11:16 AM, Steve Lewis <lordjoe2...@gmail.com>
> wrote:
> > >
> > > > We have been having problems with mappers timing out after 600 sec
> when
> > > the
> > > > mapper writes many more, say thousands of records for every
> > > > input record - even when the code in the mapper is small and fast. I
> > > > no idea what could cause the system to be so slow and am reluctant to
> > > raise
> > > > the 600 sec limit without understanding why there should be a timeout
> > > when
> > > > all MY code is very fast.
> > > > P
> > > > I am enclosing a small sample which illustrates the problem. It will
> > > > generate a 4GB text file on hdfs if the input file does not exist or
> is
> > > not
> > > > at least that size and this will take some time (hours in my
> > > configuration)
> > > > - then the code is essentially wordcount but instead of finding and
> > > > emitting words - the mapper emits all substrings of the input data -
> > this
> > > > generates a much larger output data and number of output records than
> > > > wordcount generates.
> > > > Still, the amount of data emitted is no larger than other data sets I
> > > know
> > > > Hadoop can handle.
> > > >
> > > > All mappers on my 8 node cluster eventually timeout after 600 sec -
> > even
> > > > though I see nothing in the code which is even a little slow and
> > suspect
> > > > that any slow behavior is in the  called Hadoop code. This is similar
> > to
> > > a
> > > > problem we have in bioinformatics where a  colleague saw timeouts on
> > his
> > > 50
> > > > node cluster.
> > > >
> > > > I would appreciate any help from the group. Note - if you have a text
> > > file
> > > > at least 4 GB the program will take that as an imput without trying
> to
> > > > create its own file.
> > > > /*
> > > >
> > >
> >
> ============================================================================================
> > > > */
> > > > import org.apache.hadoop.conf.*;
> > > > import org.apache.hadoop.fs.*;
> > > > import org.apache.hadoop.io.*;
> > > > import org.apache.hadoop.mapreduce.*;
> > > > import org.apache.hadoop.mapreduce.lib.input.*;
> > > > import org.apache.hadoop.mapreduce.lib.output.*;
> > > > import org.apache.hadoop.util.*;
> > > >
> > > > import java.io.*;
> > > > import java.util.*;
> > > > /**
> > > > * org.systemsbiology.hadoop.SubstringGenerator
> > > >  *
> > > >  * This illustrates an issue we are having where a mapper generating
> a
> > > > much larger volume of
> > > >  * data ans number of records times out even though the code is
> small,
> > > > simple and fast
> > > >  *
> > > >  * NOTE!!! as written the program will generate a 4GB file in hdfs
> with
> > > > good input data -
> > > >  * this is done only if the file does not exist but may take several
> > > > hours. It will only be
> > > >  * done once. After that the failure is fairly fast
> > > >  *
> > > > * What this will do is count  unique Substrings of lines of length
> > > > * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin
> > all
> > > > * substrings and  then using the word could algorithm
> > > > * What is interesting is that the number and volume or writes in the
> > > >  * map phase is MUCH larger than the number of reads and the volume
> of
> > > > read data
> > > >  *
> > > >  * The example is artificial but similar the some real BioInformatics
> > > > problems -
> > > >  *  for example finding all substrings in a gemome can be important
> for
> > > > the design of
> > > >  *  microarrays.
> > > >  *
> > > >  *  While the real problem is more complex - the problem is that
> > > >  *  when the input file is large enough the mappers time out failing
> to
> > > > report after
> > > >  *  600 sec. There is nothing slow in any of the application code and
> > > > nothing I
> > > > */
> > > > public class SubstringCount  implements Tool   {
> > > >
> > > >
> > > >    public static final long ONE_MEG = 1024 * 1024;
> > > >    public static final long ONE_GIG = 1024 * ONE_MEG;
> > > >    public static final int LINE_LENGTH = 100;
> > > >    public static final Random RND = new Random();
> > > >
> > > >   // NOTE - edit this line to be a sensible location in the current
> > file
> > > > system
> > > >    public static final String INPUT_FILE_PATH = "BigInputLines.txt";
> > > >   // NOTE - edit this line to be a sensible location in the current
> > file
> > > > system
> > > >    public static final String OUTPUT_FILE_PATH = "output";
> > > >     // NOTE - edit this line to be the input file size - 4 * ONE_GIG
> > > > should be large but not a problem
> > > >    public static final long DESIRED_LENGTH = 4 * ONE_GIG;
> > > >    // NOTE - limits on substring length
> > > >    public static final int MINIMUM_LENGTH = 5;
> > > >    public static final int MAXIMUM_LENGTH = 32;
> > > >
> > > >
> > > >    /**
> > > >     * create an input file to read
> > > >     * @param fs !null file system
> > > >     * @param p  !null path
> > > >     * @throws IOException om error
> > > >     */
> > > >    public static void guaranteeInputFile(FileSystem fs, Path p)
> throws
> > > > IOException {
> > > >        if (fs.isFile(p)) {
> > > >            FileStatus fileStatus = fs.getFileStatus(p);
> > > >            if (fileStatus.getLen() >= DESIRED_LENGTH)
> > > >                return;
> > > >        }
> > > >        FSDataOutputStream open = fs.create(p);
> > > >        PrintStream ps = new PrintStream(open);
> > > >         long showInterval = DESIRED_LENGTH  / 100;
> > > >        for (long i = 0; i < DESIRED_LENGTH; i += LINE_LENGTH) {
> > > >            writeRandomLine(ps, LINE_LENGTH);
> > > >            // show progress
> > > >            if(i % showInterval == 0)  {
> > > >                System.err.print(".");
> > > >
> > > >            }
> > > >        }
> > > >        System.err.println("");
> > > >        ps.close();
> > > >    }
> > > >
> > > >    /**
> > > >     * write a line with a random string of capital letters
> > > >     *
> > > >     * @param pPs         -  output
> > > >     * @param pLineLength length of the line
> > > >     */
> > > >    public static void writeRandomLine(final PrintStream pPs, final
> int
> > > > pLineLength) {
> > > >        StringBuilder sb = new StringBuilder();
> > > >        for (int i = 0; i < pLineLength; i++) {
> > > >            char c = (char) ('A' + RND.nextInt(26));
> > > >            sb.append(c);
> > > >
> > > >        }
> > > >        String s = sb.toString();
> > > >        pPs.println(s);
> > > >    }
> > > >
> > > >
> > > >
> > > >    /**
> > > >     * Construct a Configured.
> > > >     */
> > > >    public SubstringCount() {
> > > >    }
> > > >
> > > >
> > > >
> > > >    /**
> > > >     * similar to the Word Count mapper but one line generates a lot
> > more
> > > > output
> > > >     */
> > > >    public static class SubStringMapper
> > > >            extends Mapper<Object, Text, Text, IntWritable> {
> > > >
> > > >        /**
> > > >         * generate a array of substrings
> > > >         *
> > > >         * @param inp       input long string
> > > >         * @param minLength minimum substring length
> > > >         * @param maxLength maximum substring length
> > > >         * @return !null array of strings
> > > >         */
> > > >        public static String[] generateSubStrings(String inp, int
> > > > minLength, int maxLength) {
> > > >            List<String> holder = new ArrayList<String>();
> > > >            for (int start = 0; start < inp.length() - minLength;
> > > start++) {
> > > >                for (int end = start + minLength; end <
> > > > Math.min(inp.length(), start + maxLength); end++) {
> > > >                    try {
> > > >                        holder.add(inp.substring(start, end));
> > > >                    }
> > > >                    catch (Exception e) {
> > > >                        throw new RuntimeException(e);
> > > >
> > > >                    }
> > > >                }
> > > >            }
> > > >
> > > >            String[] ret = new String[holder.size()];
> > > >            holder.toArray(ret);
> > > >             return ret;
> > > >        }
> > > >
> > > >        private final IntWritable one = new IntWritable(1);
> > > >        private final Text word = new Text();
> > > >
> > > >        /**
> > > >         * Like word count except the words are all substrings of the
> > > input
> > > > data
> > > >         * This leads to a large increase
> > > >         * @param key  irrelevant
> > > >         * @param value  one read line
> > > >         * @param context  !null context
> > > >            */
> > > >        public void map(Object key, Text value, Context context
> > > >        ) throws IOException, InterruptedException {
> > > >            String inp = value.toString();
> > > >            // The written data is hundreds of times the input data
> > > >            String[] data = generateSubStrings(inp, MINIMUM_LENGTH,
> > > > MAXIMUM_LENGTH);
> > > >            for (int i = 0; i < data.length; i++) {
> > > >                String s = data[i];
> > > >                word.set(s);
> > > >                context.write(word, one);
> > > >            }
> > > >        }
> > > >    }
> > > >
> > > >    /**
> > > >     * Essentially the same reducer used by word count
> > > >     */
> > > >    public static class IntSumReducer
> > > >            extends Reducer<Text, IntWritable, Text, IntWritable> {
> > > >        private final IntWritable result = new IntWritable();
> > > >
> > > >        public void reduce(Text key, Iterable<IntWritable> values,
> > > >                           Context context
> > > >        ) throws IOException, InterruptedException {
> > > >            int sum = 0;
> > > >            for (IntWritable val : values) {
> > > >                sum += val.get();
> > > >            }
> > > >            result.set(sum);
> > > >            context.write(key, result);
> > > >        }
> > > >    }
> > > >
> > > >
> > > >    /**
> > > >     * kill a directory and all contents
> > > >     * useful to make sure the output directory is empty
> > > >     * @param src !null path of a directory
> > > >     * @param fs !null file system
> > > >     * @return  true on success
> > > >     */
> > > >    public static boolean expunge(Path src, FileSystem fs) {
> > > >        try {
> > > >            if (!fs.exists(src))
> > > >                return true;
> > > >            // break these out
> > > >            if (fs.getFileStatus(src).isDir()) {
> > > >                boolean doneOK = fs.delete(src, true);
> > > >                doneOK = !fs.exists(src);
> > > >                return doneOK;
> > > >            }
> > > >            if (fs.isFile(src)) {
> > > >                boolean doneOK = fs.delete(src, false);
> > > >                return doneOK;
> > > >            }
> > > >            throw new IllegalStateException("should be file of
> directory
> > > if
> > > > it exists");
> > > >        }
> > > >        catch (IOException e) {
> > > >            throw new RuntimeException(e);
> > > >        }
> > > >
> > > >    }
> > > >
> > > >     /**
> > > >      * this implementation of run allows the program to start with a
> > > > Configuration which may
> > > >      * have been filled by other code
> > > >      * @param conf !null conf
> > > >      * @param args  !null arguments
> > > >      * @return 0 on success
> > > >      * @throws Exception  on error
> > > >      */
> > > >    public int runJob(Configuration conf, String[] args) throws
> > Exception
> > > {
> > > >        String[] realArgs = new String[2];
> > > >
> > > >        String[] otherArgs = new GenericOptionsParser(conf,
> > > > args).getRemainingArgs();
> > > >
> > > >        if(otherArgs.length > 0)
> > > >             realArgs[0] = otherArgs[0];
> > > >         else
> > > >             realArgs[0] = INPUT_FILE_PATH;
> > > >        if(otherArgs.length > 1)
> > > >             realArgs[1] = otherArgs[1];
> > > >         else
> > > >             realArgs[1] = OUTPUT_FILE_PATH;
> > > >
> > > >        Path InputPath = new Path(realArgs[0]);
> > > >        Path outputDir =  new Path(realArgs[1]);
> > > >
> > > >         Job job = new Job(conf, "Substring Generator");
> > > >        conf = job.getConfiguration(); // NOTE JOB Copies the
> > configuraton
> > > >        job.setJarByClass(SubstringCount.class);
> > > >        job.setMapperClass(SubStringMapper.class);
> > > >        job.setCombinerClass(IntSumReducer.class);
> > > >        job.setReducerClass(IntSumReducer.class);
> > > >
> > > >        job.setMapOutputKeyClass(Text.class);
> > > >        job.setMapOutputValueClass(IntWritable.class);
> > > >        job.setOutputKeyClass(Text.class);
> > > >        job.setOutputValueClass(IntWritable.class);
> > > >
> > > >         job.setNumReduceTasks(72); // todo size for your cluster
> > > >        FileInputFormat.addInputPath(job,InputPath);
> > > >
> > > >
> > > >        FileSystem fileSystem = outputDir.getFileSystem(conf);
> > > > // make sure the output directory is empty
> > > >        expunge(outputDir, fileSystem);    // make sure thia does not
> > > exist
> > > >        FileOutputFormat.setOutputPath(job, outputDir);
> > > >
> > > >        // Now create something for the job to read
> > > >        guaranteeInputFile(fileSystem, InputPath);
> > > >
> > > >
> > > >        boolean ans = job.waitForCompletion(true);
> > > >        int ret = ans ? 0 : 1;
> > > >        return ret;
> > > >    }
> > > >
> > > >
> > > >    /**
> > > >     * Execute the command with the given arguments.
> > > >     *
> > > >     * @param args command specific arguments.
> > > >     * @return exit code.
> > > >     * @throws Exception
> > > >     */
> > > >    @Override
> > > >    public int run(final String[] args) throws Exception {
> > > >        Configuration conf = new Configuration();
> > > >        return runJob(conf, args);
> > > >    }
> > > >
> > > >    /**
> > > >     *
> > > >     * @param args  args[0] is the path to a file to be created in the
> > > > FileSystem
> > > >     * args[1] is the path to an output directory in the file system -
> > the
> > > > contents WILL be deleted
> > > >     * @throws Exception  on error
> > > >     */
> > > >    public static void main(String[] args) throws Exception {
> > > >        ToolRunner.run(new SubstringCount(), args);
> > > >    }
> > > > }
> > > >
> > > > --
> > > > Steven M. Lewis PhD
> > > > 4221 105th Ave NE
> > > > Kirkland, WA 98033
> > > > 206-384-1340 (cell)
> > > > Skype lordjoe_com
> > >
> >
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>

Reply via email to