Sure, our FileInputFormat implementation:
public class CVSInputFormat extends FileInputFormat<FileValidatorDescriptor, Text> { /* * (non-Javadoc) * * @see * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache * .hadoop.mapreduce.InputSplit, * org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override public RecordReader<FileValidatorDescriptor, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(); return new CVSLineRecordReader(recordDelimiterBytes); } /* * (non-Javadoc) * * @see * org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(org * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path) */ @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } } the recordReader: public class CVSLineRecordReader extends RecordReader<FileValidatorDescriptor, Text> { private static final Log LOG = LogFactory.getLog(CVSLineRecordReader.class); public static final String CVS_FIRST_LINE = "file.first.line"; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private FileValidatorDescriptor key = null; private Text value = null; private Text data = null; private byte[] recordDelimiterBytes; public CVSLineRecordReader(byte[] recordDelimiter) { this.recordDelimiterBytes = recordDelimiter; } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { Properties properties = new Properties(); Configuration configuration = context.getConfiguration(); Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context .getConfiguration()); for (Path cacheFile : cacheFiles) { if (cacheFile.toString().endsWith( context.getConfiguration().get(VALIDATOR_CONF_PATH))) { properties.load(new FileReader(cacheFile.toString())); } } FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); pos = start; final Path file = split.getPath(); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); this.in = generateReader(fileIn, job); // if CVS_FIRST_LINE does not exist in conf then the csv file first line // is the header if (properties.containsKey(CVS_FIRST_LINE)) { configuration.set(CVS_FIRST_LINE, properties.get(CVS_FIRST_LINE) .toString()); } else { readData(); configuration.set(CVS_FIRST_LINE, data.toString()); if (start != 0) { fileIn.seek(start); in = generateReader(fileIn, job); pos = start; } } key = new FileValidatorDescriptor(); key.setFileName(split.getPath().getName()); context.getConfiguration().set("file.name", key.getFileName()); } @Override public boolean nextKeyValue() throws IOException { int newSize = readData(); if (newSize == 0) { key = null; value = null; return false; } else { key.setOffset(this.pos); value = data; return true; } } private LineReader generateReader(FSDataInputStream fileIn, Configuration job) throws IOException { if (null == this.recordDelimiterBytes) { return new LineReader(fileIn, job); } else { return new LineReader(fileIn, job, this.recordDelimiterBytes); } } private int readData() throws IOException { if (data == null) { data = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(data, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return newSize; } @Override public FileValidatorDescriptor getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } @Override public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public synchronized void close() throws IOException { if (in != null) { in.close(); } } } Thanks. De: Azuryy Yu <azury...@gmail.com<mailto:azury...@gmail.com>> Responder a: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" <user@hadoop.apache.org<mailto:user@hadoop.apache.org>> Fecha: viernes, 22 de noviembre de 2013 12:19 Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" <user@hadoop.apache.org<mailto:user@hadoop.apache.org>> Asunto: Re: Missing records from HDFS I do think this is because of your RecorderReader, can you paste your code here? and give a piece of data example. please use pastebin if you want. On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ <zora...@tid.es<mailto:zora...@tid.es>> wrote: One more thing, if we split the files then all the records are processed. Files are of 70,5MB. Thanks, Zoraida.- De: zoraida <zora...@tid.es<mailto:zora...@tid.es>> Fecha: viernes, 22 de noviembre de 2013 08:59 Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" <user@hadoop.apache.org<mailto:user@hadoop.apache.org>> Asunto: Re: Missing records from HDFS Thanks for your response Azuryy. My hadoop version: 2.0.0-cdh4.3.0 InputFormat: a custom class that extends from FileInputFormat(csv input format) These fiels are under the same directory, different files. My input path is configured using oozie throughout the propertie mapred.input.dir. Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does not discard any record. Thanks. De: Azuryy Yu <azury...@gmail.com<mailto:azury...@gmail.com>> Responder a: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" <user@hadoop.apache.org<mailto:user@hadoop.apache.org>> Fecha: jueves, 21 de noviembre de 2013 07:31 Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" <user@hadoop.apache.org<mailto:user@hadoop.apache.org>> Asunto: Re: Missing records from HDFS what's your hadoop version? and which InputFormat are you used? these files under one directory or there are lots of subdirectory? how ddi you configure input path in your main? On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ <zora...@tid.es<mailto:zora...@tid.es>> wrote: Hi all, my job is not reading all the input records. In the input directory I have a set of files containing a total of 6000000 records but only 5997000 are processed. The Map Input Records counter says 5997000. I have tried downloading the files with a getmerge to check how many records would return but the correct number is returned(6000000). Do you have any suggestion? Thanks. ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo. This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo. This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx ________________________________ Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo. This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at: http://www.tid.es/ES/PAGINAS/disclaimer.aspx