Sure,

our FileInputFormat implementation:


public class CVSInputFormat extends

        FileInputFormat<FileValidatorDescriptor, Text> {


    /*

     * (non-Javadoc)

     *

     * @see

     * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache

     * .hadoop.mapreduce.InputSplit,

     * org.apache.hadoop.mapreduce.TaskAttemptContext)

     */

    @Override

    public RecordReader<FileValidatorDescriptor, Text> createRecordReader(

            InputSplit split, TaskAttemptContext context) {

        String delimiter = context.getConfiguration().get(

                "textinputformat.record.delimiter");

        byte[] recordDelimiterBytes = null;

        if (null != delimiter)

            recordDelimiterBytes = delimiter.getBytes();

        return new CVSLineRecordReader(recordDelimiterBytes);

    }


    /*

     * (non-Javadoc)

     *

     * @see

     * org.apache.hadoop.mapreduce.lib.input.FileInputFormat#isSplitable(org

     * .apache.hadoop.mapreduce.JobContext, org.apache.hadoop.fs.Path)

     */

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        CompressionCodec codec = new CompressionCodecFactory(

                context.getConfiguration()).getCodec(file);

        return codec == null;

    }

}


the recordReader:



public class CVSLineRecordReader extends

        RecordReader<FileValidatorDescriptor, Text> {

    private static final Log LOG = LogFactory.getLog(CVSLineRecordReader.class);


    public static final String CVS_FIRST_LINE = "file.first.line";


    private long start;

    private long pos;

    private long end;

    private LineReader in;

    private int maxLineLength;

    private FileValidatorDescriptor key = null;

    private Text value = null;

    private Text data = null;

    private byte[] recordDelimiterBytes;


    public CVSLineRecordReader(byte[] recordDelimiter) {

        this.recordDelimiterBytes = recordDelimiter;

    }


    @Override

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)

            throws IOException {

        Properties properties = new Properties();

        Configuration configuration = context.getConfiguration();


        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context

                .getConfiguration());

        for (Path cacheFile : cacheFiles) {

            if (cacheFile.toString().endsWith(

                    context.getConfiguration().get(VALIDATOR_CONF_PATH))) {

                properties.load(new FileReader(cacheFile.toString()));

            }

        }


        FileSplit split = (FileSplit) genericSplit;

        Configuration job = context.getConfiguration();

        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",

                Integer.MAX_VALUE);

        start = split.getStart();

        end = start + split.getLength();

        pos = start;

        final Path file = split.getPath();


        // open the file and seek to the start of the split

        FileSystem fs = file.getFileSystem(job);

        FSDataInputStream fileIn = fs.open(split.getPath());


        this.in = generateReader(fileIn, job);


        // if CVS_FIRST_LINE does not exist in conf then the csv file first line

        // is the header

        if (properties.containsKey(CVS_FIRST_LINE)) {

            configuration.set(CVS_FIRST_LINE, properties.get(CVS_FIRST_LINE)

                    .toString());

        } else {

            readData();

            configuration.set(CVS_FIRST_LINE, data.toString());

            if (start != 0) {

                fileIn.seek(start);

                in = generateReader(fileIn, job);

                pos = start;

            }

        }


        key = new FileValidatorDescriptor();

        key.setFileName(split.getPath().getName());

        context.getConfiguration().set("file.name", key.getFileName());


    }


    @Override

    public boolean nextKeyValue() throws IOException {

        int newSize = readData();

        if (newSize == 0) {

            key = null;

            value = null;

            return false;

        } else {

            key.setOffset(this.pos);

            value = data;

            return true;

        }

    }


    private LineReader generateReader(FSDataInputStream fileIn,

            Configuration job) throws IOException {

        if (null == this.recordDelimiterBytes) {

            return new LineReader(fileIn, job);

        } else {

            return new LineReader(fileIn, job, this.recordDelimiterBytes);

        }


    }


    private int readData() throws IOException {

        if (data == null) {

            data = new Text();

        }

        int newSize = 0;

        while (pos < end) {

            newSize = in.readLine(data, maxLineLength,

                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),

                            maxLineLength));

            if (newSize == 0) {

                break;

            }

            pos += newSize;

            if (newSize < maxLineLength) {

                break;

            }


            // line too long. try again

            LOG.info("Skipped line of size " + newSize + " at pos "

                    + (pos - newSize));

        }

        return newSize;

    }


    @Override

    public FileValidatorDescriptor getCurrentKey() {

        return key;

    }


    @Override

    public Text getCurrentValue() {

        return value;

    }


    @Override

    public float getProgress() {

        if (start == end) {

            return 0.0f;

        } else {

            return Math.min(1.0f, (pos - start) / (float) (end - start));

        }

    }


    @Override

    public synchronized void close() throws IOException {

        if (in != null) {

            in.close();

        }

    }

}

Thanks.

De: Azuryy Yu <azury...@gmail.com<mailto:azury...@gmail.com>>
Responder a: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Fecha: viernes, 22 de noviembre de 2013 12:19
Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Asunto: Re: Missing records from HDFS

I do think this is because of your RecorderReader, can you paste your code 
here? and give a piece of data example.

please use pastebin if you want.


On Fri, Nov 22, 2013 at 7:16 PM, ZORAIDA HIDALGO SANCHEZ 
<zora...@tid.es<mailto:zora...@tid.es>> wrote:
One more thing,

if we split the files then all the records are processed. Files are of 70,5MB.

Thanks,

Zoraida.-

De: zoraida <zora...@tid.es<mailto:zora...@tid.es>>
Fecha: viernes, 22 de noviembre de 2013 08:59

Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Asunto: Re: Missing records from HDFS

Thanks for your response Azuryy.

My hadoop version: 2.0.0-cdh4.3.0
InputFormat: a custom class that extends from FileInputFormat(csv input format)
These fiels are under the same directory, different files.
My input path is configured using oozie throughout the propertie 
mapred.input.dir.


Same code and input running on Hadoop 2.0.0-cdh4.2.1 works fine. Does not 
discard any record.

Thanks.

De: Azuryy Yu <azury...@gmail.com<mailto:azury...@gmail.com>>
Responder a: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Fecha: jueves, 21 de noviembre de 2013 07:31
Para: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Asunto: Re: Missing records from HDFS

what's your hadoop version? and which InputFormat are you used?

these files under one directory or there are lots of subdirectory? how ddi you 
configure input path in your main?



On Thu, Nov 21, 2013 at 12:25 AM, ZORAIDA HIDALGO SANCHEZ 
<zora...@tid.es<mailto:zora...@tid.es>> wrote:
Hi all,

my job is not reading all the input records. In the input directory I have a 
set of files containing a total of 6000000 records but only 5997000 are 
processed. The Map Input Records counter says 5997000.
I have tried downloading the files with a getmerge to check how many records 
would return but the correct number is returned(6000000).

Do you have any suggestion?

Thanks.

________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar 
nuestra política de envío y recepción de correo electrónico en el enlace 
situado más abajo.
This message is intended exclusively for its addressee. We only send and 
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx


________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar 
nuestra política de envío y recepción de correo electrónico en el enlace 
situado más abajo.
This message is intended exclusively for its addressee. We only send and 
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx


________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar 
nuestra política de envío y recepción de correo electrónico en el enlace 
situado más abajo.
This message is intended exclusively for its addressee. We only send and 
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx

Reply via email to