Hmmm - I have only tested in local mode but I got an
java.io.NotSerializableException: org.apache.hadoop.io.Text
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
 at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
 at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
Here are two classes - one will work one will not
the mgf file is what they read

showPairRDD simply print  the text read
guaranteeSparkMaster calls sparkConf.setMaster("local"); if there is no
master defined

Perhaps I need to convert Text somewhere else but I certainly don't see
where
package com.lordjoe.distributed.input;

/**
 * com.lordjoe.distributed.input.MGFInputFormat
 * User: Steve
 * Date: 9/24/2014
 */

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;

import java.io.*;

/**
 * org.systemsbiology.hadoop.MGFInputFormat
 * Splitter that reads mgf files
 * nice enough to put the begin and end tags on separate lines
 */
public class MGFInputFormat extends FileInputFormat<StringBuffer, StringBuffer>  implements Serializable {

    private String m_Extension = "mgf";

    public MGFInputFormat() {

    }

    @SuppressWarnings("UnusedDeclaration")
    public String getExtension() {
        return m_Extension;
    }
     @SuppressWarnings("UnusedDeclaration")
    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }


    @Override
    public RecordReader<StringBuffer, StringBuffer> createRecordReader(InputSplit split,
                                                       TaskAttemptContext context) {
        return new MGFFileReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        final String lcName = file.getName().toLowerCase();
        //noinspection RedundantIfStatement
        if (lcName.endsWith("gz"))
            return false;
        return true;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single value with the name as a key
     * Value is the entire file
     * Key is the file name
     */
    public class MGFFileReader extends RecordReader<StringBuffer, StringBuffer> implements Serializable {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long current;
        private LineReader m_Input;
        FSDataInputStream m_RealFile;
        private StringBuffer key = null;
        private StringBuffer value = null;
        private  Text buffer; // must be

        public Text getBuffer() {
            if(buffer == null)
                buffer = new Text();
            return buffer;
        }

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Start = split.getStart();
            m_End = m_Start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            boolean skipFirstLine = false;
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the m_Start of the split
            FileSystem fs = file.getFileSystem(job);
            // open the file and seek to the m_Start of the split
            m_RealFile = fs.open(split.getPath());
            if (codec != null) {
                CompressionInputStream inputStream = codec.createInputStream(m_RealFile);
                m_Input = new LineReader( inputStream );
                m_End = Long.MAX_VALUE;
            }
            else {
                if (m_Start != 0) {
                    skipFirstLine = true;
                    --m_Start;
                    m_RealFile.seek(m_Start);
                }
                m_Input = new LineReader( m_RealFile);
            }
            // not at the beginning so go to first line
            if (skipFirstLine) {  // skip first line and re-establish "m_Start".
                m_Start += m_Input.readLine(getBuffer()) ;
            }

            current = m_Start;
            if (key == null) {
                key = new StringBuffer();
            }
            else {
                key.setLength(0);
            }
            key.append(split.getPath().getName());
            if (value == null) {
                value = new StringBuffer();
            }

            current = 0;
        }

        /**
         * look for a <scan tag then read until it closes
         *
         * @return true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException
        {
            int newSize;
            while (current < m_Start) {
                newSize = m_Input.readLine(buffer);
                // we are done
                if (newSize == 0) {
                    key = null;
                    value = null;
                    return false;
                }
                current = m_RealFile.getPos();
            }
            StringBuilder sb = new StringBuilder();
            newSize = m_Input.readLine(getBuffer());
            String str;
            while (newSize > 0) {
                str = buffer.toString();

                if ("BEGIN IONS".equals(str)) {
                    break;
                }
                current = m_RealFile.getPos();
                // we are done
                if (current > m_End) {
                    key = null;
                    value = null;
                    return false;

                }
                newSize = m_Input.readLine(getBuffer());
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;

            }
            while (newSize > 0) {
                str = buffer.toString();
                sb.append(str);
                sb.append("\n");
                if ("END IONS".equals(str)) {
                    break;
                }
                newSize = m_Input.readLine(buffer);
            }

            String s = sb.toString();
            value = new StringBuffer();
            value.append(s);

            if (sb.length() == 0) {
                key = null;
                value = null;
                return false;
            }
            else {
                return true;
            }
        }

        @Override
        public StringBuffer getCurrentKey() {
            return key;
        }

        @Override
        public StringBuffer getCurrentValue() {
            return value;
        }


        /**
         * Get the progress within the split
         */
        public float getProgress() {
            long totalBytes = m_End - m_Start;
            long totalHandled = current - m_Start;
            return ((float) totalHandled) / totalBytes;
        }


        public synchronized void close() throws IOException {
            if (m_Input != null) {
                m_Input.close();
            }
        }
    }
}
package com.lordjoe.distributed.input;

/**
 * com.lordjoe.distributed.input.MGFInputFormat
 * User: Steve
 * Date: 9/24/2014
 */

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;

import java.io.*;

/**
 * org.systemsbiology.hadoop.MGFTextInputFormat
 * Splitter that reads mgf files with data type as Text
 * nice enough to put the begin and end tags on separate lines
 */
public class MGFTextInputFormat extends FileInputFormat<Text, Text> implements Serializable {

    private String m_Extension = "mgf";

    public MGFTextInputFormat() {

    }

    @SuppressWarnings("UnusedDeclaration")
    public String getExtension() {
        return m_Extension;
    }

    @SuppressWarnings("UnusedDeclaration")
    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }


    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                       TaskAttemptContext context) {
        return new MGFFileReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        final String lcName = file.getName().toLowerCase();
        //noinspection RedundantIfStatement
        if (lcName.endsWith("gz"))
            return false;
        return true;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single value with the name as a key
     * Value is the entire file
     * Key is the file name
     */
    public class MGFFileReader extends RecordReader<Text, Text> implements Serializable {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long current;
        private LineReader m_Input;
        FSDataInputStream m_RealFile;
        private Text key = null;
        private Text value = null;
        private Text buffer; // must be

        public Text getBuffer() {
            if (buffer == null)
                buffer = new Text();
            return buffer;
        }

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Start = split.getStart();
            m_End = m_Start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            boolean skipFirstLine = false;
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the m_Start of the split
            FileSystem fs = file.getFileSystem(job);
            // open the file and seek to the m_Start of the split
            m_RealFile = fs.open(split.getPath());
            if (codec != null) {
                CompressionInputStream inputStream = codec.createInputStream(m_RealFile);
                m_Input = new LineReader(inputStream);
                m_End = Long.MAX_VALUE;
            }
            else {
                if (m_Start != 0) {
                    skipFirstLine = true;
                    --m_Start;
                    m_RealFile.seek(m_Start);
                }
                m_Input = new LineReader(m_RealFile);
            }
            // not at the beginning so go to first line
            if (skipFirstLine) {  // skip first line and re-establish "m_Start".
                m_Start += m_Input.readLine(getBuffer());
            }

            current = m_Start;
            if (key == null) {
                key = new Text();
            }
            key.set(split.getPath().getName());
            if (value == null) {
                value = new Text();
            }

            current = 0;
        }

        /**
         * look for a <scan tag then read until it closes
         *
         * @return true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException {
            int newSize;
            while (current < m_Start) {
                newSize = m_Input.readLine(buffer);
                // we are done
                if (newSize == 0) {
                    key = null;
                    value = null;
                    return false;
                }
                current = m_RealFile.getPos();
            }
            StringBuilder sb = new StringBuilder();
            newSize = m_Input.readLine(getBuffer());
            String str;
            while (newSize > 0) {
                str = buffer.toString();

                if ("BEGIN IONS".equals(str)) {
                    break;
                }
                current = m_RealFile.getPos();
                // we are done
                if (current > m_End) {
                    key = null;
                    value = null;
                    return false;

                }
                newSize = m_Input.readLine(getBuffer());
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;

            }
            while (newSize > 0) {
                str = buffer.toString();
                sb.append(str);
                sb.append("\n");
                if ("END IONS".equals(str)) {
                    break;
                }
                newSize = m_Input.readLine(buffer);
            }

            String s = sb.toString();
            value = new Text();
            value.set(s);

            if (sb.length() == 0) {
                key = null;
                value = null;
                return false;
            }
            else {
                return true;
            }
        }

        @Override
        public Text getCurrentKey() {
            return key;
        }

        @Override
        public Text getCurrentValue() {
            return value;
        }


        /**
         * Get the progress within the split
         */
        public float getProgress() {
            long totalBytes = m_End - m_Start;
            long totalHandled = current - m_Start;
            return ((float) totalHandled) / totalBytes;
        }


        public synchronized void close() throws IOException {
            if (m_Input != null) {
                m_Input.close();
            }
        }
    }
}
package com.lordjoe.distributed.input.spark;

import com.lordjoe.distributed.*;
import com.lordjoe.distributed.input.*;
import org.apache.spark.*;
import org.apache.spark.api.java.*;

/**
 * org.systemsbiology.xtandem.spark.SparkTandemUtilitiesTests
 * User: Steve
 * Date: 9/22/2014
 */
public class SparkTandemUtilitiesTests {


     public static class MZXMLInputFormat extends XMLTagInputFormat
     {
         public MZXMLInputFormat() {
             super("scan");
         }
     }

    public static JavaPairRDD<String,String> parseSpectrumFile(String path,JavaSparkContext ctx) {

        if(path.toLowerCase().endsWith(".mgf"))
              return parseAsTextMGF(path,ctx);
        if(path.toLowerCase().endsWith(".mgf"))
            return parseAsMGF(path,ctx);
          if(path.toLowerCase().endsWith(".mzxml"))
            return parseAsMZXML(path,ctx);
        throw new UnsupportedOperationException("Cannot understand extension " + path);
     }

    public static JavaPairRDD<String,String> parseAsMZXML(final String path, final JavaSparkContext ctx) {
        Class inputFormatClass = MZXMLInputFormat.class;
         Class keyClass = String.class;
         Class valueClass = String.class;

         return ctx.newAPIHadoopFile(
                   path,
                 inputFormatClass,
                     keyClass,
                    valueClass,
                  ctx.hadoopConfiguration()
         );
    }



    public static JavaPairRDD<String,String> parseAsOldMGF(final String path, final JavaSparkContext ctx) {
        Class inputFormatClass = MGFOldInputFormat.class;
         Class keyClass = String.class;
         Class valueClass = String.class;

         return ctx.hadoopFile(
                   path,
                 inputFormatClass,
                     keyClass,
                    valueClass
           );
    }


    public static JavaPairRDD<String,String> parseAsTextMGF(final String path, final JavaSparkContext ctx) {
        Class inputFormatClass = MGFTextInputFormat.class;
         Class keyClass = String.class;
         Class valueClass = String.class;

         return ctx.newAPIHadoopFile(
                   path,
                 inputFormatClass,
                     keyClass,
                    valueClass,
                  ctx.hadoopConfiguration()
         );
    }

    public static JavaPairRDD<String,String> parseAsMGF(final String path, final JavaSparkContext ctx) {
        Class inputFormatClass = MGFInputFormat.class;
         Class keyClass = String.class;
         Class valueClass = String.class;

         return ctx.newAPIHadoopFile(
                   path,
                 inputFormatClass,
                     keyClass,
                    valueClass,
                  ctx.hadoopConfiguration()
         );
    }


    public static JavaPairRDD<String,String> parseSpectrumFileOld(String path,JavaSparkContext ctx) {

        Class inputFormatClass = MGFOldInputFormat.class;
        Class keyClass = String.class;
        Class valueClass = String.class;

        return ctx.hadoopFile(
                  path,
                   inputFormatClass,
                   keyClass,
                   valueClass
        );
    }

    public static void main(String[] args) {
        if(args.length == 0)    {
            System.out.println("usage <file holding mgfs>");
            return;
        }
        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
        SparkUtilities.guaranteeSparkMaster(sparkConf);    // use local if no master provided

        JavaSparkContext ctx = new JavaSparkContext(sparkConf);

        for (int i = 0; i < args.length; i++) {
            String arg = args[i];
            JavaPairRDD<String, String> parsed =  parseSpectrumFile(args[i], ctx);
            SparkUtilities.showPairRDD(parsed);

        }



    }

}

Attachment: salivaryglandhealthycontrol10.mgf
Description: Binary data

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to