Hi,

we got an application that runs into a never ending mapper
routine when we start the application with more than 1 mappers. If we
start the application on a Cluster or Pseudo Cluster with only one
mapper and reducer it is doing fine. We use a custom FileInputFormat
with a custom RecordReader. Their code is attached.

This
is the mapper function. For clarity I removed most of the code because
there is no error within the map function. As you will see in the log
messages below for a run with two mappers that both mapper completely
run through the map code with no error. The problem is somewhere after
the map and before the reduce part of the run. And as said before it is
only faced if we use more than one mapper. When it has done 50% 
mapping and 16% reducing it doesnt reply any more and runs infinitely.

public void
 map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, 
Reporter reporter) throws IOException {
   
        LOG.info("Masks: " +
BinaryStringConverter.parseLongToBinaryString(MASKS[0]) + ", " +
BinaryStringConverter.parseLongToBinaryString(MASKS[1]) + ", " +
BinaryStringConverter.parseLongToBinaryString(MASKS[2]) + ", " +
BinaryStringConverter.parseLongToBinaryString(MASKS[3]));
.........
.......
            LOG.info("Finished with mapper commands.");
        }



When starting the application with more than one mapper. Every mapper
reachs the last LOG.info output of the mapping function.

But the output logs of the mapper look like this:
Mapper that failed:

2010-01-05 15:34:16,640 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: 
Initializing JVM Metrics with processName=MAP, sessionId=

2010-01-05 15:34:16,796 INFO de.hpi.hadoop.duplicates.LongRecordReader: 
Splitting from 0 to 800 length: 800

2010-01-05 15:34:16,828 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 2

2010-01-05 15:34:16,859 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100

2010-01-05 15:34:25,609 INFO org.apache.hadoop.mapred.MapTask: data buffer = 
79691776/99614720

2010-01-05 15:34:25,609 INFO org.apache.hadoop.mapred.MapTask: record buffer = 
262144/327680
.............
.......
2010-01-05 15:34:26,828 INFO de.hpi.hadoop.duplicates.DuplicateFinder: Finished 
with mapper commands.

Mapper that does not fail:

2010-01-05 15:34:16,656 INFO
 org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with 
processName=MAP, sessionId=
2010-01-05 15:34:16,828 INFO de.hpi.hadoop.duplicates.LongRecordReader: 
Splitting from 800 to 1600 length: 800
2010-01-05 15:34:16,843 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 2
2010-01-05 15:34:16,859 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2010-01-05 15:34:25,609 INFO org.apache.hadoop.mapred.MapTask: data buffer = 
79691776/99614720
2010-01-05 15:34:25,609 INFO org.apache.hadoop.mapred.MapTask: record buffer = 
262144/327680
.............
.......
2010-01-05 15:34:26,765 INFO de.hpi.hadoop.duplicates.DuplicateFinder: Finished 
with mapper commands.
2010-01-05 15:34:26,765 INFO org.apache.hadoop.mapred.MapTask: Starting flush 
of map output
2010-01-05 15:34:27,531 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
2010-01-05 15:34:27,578 INFO org.apache.hadoop.mapred.TaskRunner:
 Task:attempt_201001051529_0002_m_000001_0 is done. And is in the process of 
commiting
2010-01-05 15:34:27,656 INFO org.apache.hadoop.mapred.TaskRunner: Task 
'attempt_201001051529_0002_m_000001_0' done.


Please share if you have faced similar problem or if you know the solution or 
you need more information.


Thanks,
Ziawasch Abedjan


__________________________________________________
Do You Yahoo!?
Sie sind Spam leid? Yahoo! Mail verfügt über einen herausragenden Schutz gegen 
Massenmails. 
http://mail.yahoo.com 
package de.hpi.hadoop.duplicates;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

public class LongRecordReader implements RecordReader<LongWritable, Text> {
	private long start;
	private long pos;
	private long end;
	private LongReader in;
	private LongWritable key = null;
	private Text value = null;
	
	private static final Log LOG = LogFactory.getLog(LongRecordReader.class);
	
	public LongRecordReader(FileSplit split, Configuration job) throws IOException {
		
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		
		LOG.info("Splitting from " + start + " to " + end + " length: " + split.getLength());

		// open the file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		if (start != 0) {
			fileIn.seek(start);
		}
		in = new LongReader(fileIn, job);
		this.pos = start;
	}

	public LongWritable getCurrentKey() {
		return key;
	}

	public Text getCurrentValue() {
		return value;
	}

	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

	public LongWritable createKey() {
		return new LongWritable();
	}

	public Text createValue() {
		return new Text();
	}

	public long getPos() throws IOException {
		return pos;
	}

	public boolean next(LongWritable key, Text value) throws IOException {
		if (key == null) {
			key = new LongWritable();
		}

		key.set(pos);
		if (value == null) {
			value = new Text();
		}

		int size = in.readLongValues(value, (int) (end - pos));

		if (size == 0) {
			key = null;
			value = null;
			return false;
		} else {
			return true;
		}
	}

	class LongReader {
		private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
		private int bufferSize = DEFAULT_BUFFER_SIZE;
		private InputStream in;
		private byte[] buffer;
		private int bufferLength = 0;
		private int bufferPos = 0;

		public LongReader(InputStream in) {
			this(in, DEFAULT_BUFFER_SIZE);
		}

		public LongReader(InputStream in, int bufferSize) {
			this.in = in;
			this.bufferSize = bufferSize;
			this.buffer = new byte[this.bufferSize];
		}

		public LongReader(InputStream in, Configuration conf)
				throws IOException {
			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
		}

		boolean fillBuffer() throws IOException {
			bufferPos = 0;
			bufferLength = in.read(buffer);
			return bufferLength > 0;
		}

		public void close() throws IOException {
			in.close();
		}

		public int readLongValues(Text str, int bytesToRead) throws IOException {
			assert (bytesToRead % 8 == 0);

			int startPos = bufferPos;
			int stopPos;
			long bytesConsumed = 0;

			// fill buffer if current data in buffer was already used
			// if buffer cannot be filled: EOF reached
			while (bytesConsumed < bytesToRead) {
				if (bufferPos >= bufferLength) {
					if (!fillBuffer()) {
						break;
					}
				}
				startPos = bufferPos;
				stopPos = (int) Math.min(bufferLength - bufferPos, bytesToRead
						- bytesConsumed);
				for (; bufferPos < stopPos; bufferPos += 8) {
					str.append(buffer, bufferPos, 8);
				}

				bytesConsumed += bufferPos - startPos;
			}

			return (int) bytesConsumed;
		}

		public int readLongValues(Text str) throws IOException {
			return readLongValues(str, Integer.MAX_VALUE);
		}
	}
}
package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

import de.hpi.hadoop.duplicates.LongRecordReader;

public class LongFileInputFormat extends FileInputFormat<LongWritable, Text> {

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf,
			Reporter reporter) throws IOException {
		reporter.setStatus(split.toString());
		return new LongRecordReader((FileSplit) split, conf);
	}
	
	@Override
	protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
		long splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
		if(splitSize % 8 != 0) {
			return splitSize - (splitSize % 8);
		} else {
			return splitSize;
		}
	}
}

Reply via email to