Hey Steve, Want to contribute it as an example to MR? Would love to help.
thanks, Arun On Jul 11, 2011, at 12:11 PM, Steve Lewis wrote: > Look at this sample > ============================================= > package org.systemsbiology.hadoop; > > > > import org.apache.hadoop.conf.*; > import org.apache.hadoop.fs.*; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.io.*; > import org.apache.hadoop.io.compress.*; > import org.apache.hadoop.mapreduce.*; > import org.apache.hadoop.mapreduce.lib.input.*; > > import java.io.*; > import java.util.*; > > /** > * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat > * Splitter that reads scan tags from an XML file > * No assumption is made about lines but tage and end tags MUST look like > <MyTag </MyTag> with no embedded spaces > * usually you will subclass and hard code the tag you want to split on > */ > public class XMLTagInputFormat extends FileInputFormat<Text, Text> { > public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; > > > private static final double SPLIT_SLOP = 1.1; // 10% slop > > > public static final int BUFFER_SIZE = 4096; > > private final String m_BaseTag; > private final String m_StartTag; > private final String m_EndTag; > private String m_Extension; > > public XMLTagInputFormat(final String pBaseTag) { > m_BaseTag = pBaseTag; > m_StartTag = "<" + pBaseTag; > m_EndTag = "</" + pBaseTag + ">"; > > } > > public String getExtension() { > return m_Extension; > } > > public void setExtension(final String pExtension) { > m_Extension = pExtension; > } > > public boolean isSplitReadable(InputSplit split) { > if (!(split instanceof FileSplit)) > return true; > FileSplit fsplit = (FileSplit) split; > Path path1 = fsplit.getPath(); > return isPathAcceptable(path1); > } > > protected boolean isPathAcceptable(final Path pPath1) { > String path = pPath1.toString().toLowerCase(); > if(path.startsWith("part-r-")) > return true; > String extension = getExtension(); > if (extension != null && path.endsWith(extension.toLowerCase())) > return true; > if (extension != null && path.endsWith(extension.toLowerCase() + > ".gz")) > return true; > if (extension == null ) > return true; > return false; > } > > public String getStartTag() { > return m_StartTag; > } > > public String getBaseTag() { > return m_BaseTag; > } > > public String getEndTag() { > return m_EndTag; > } > > @Override > public RecordReader<Text, Text> createRecordReader(InputSplit split, > TaskAttemptContext > context) { > if (isSplitReadable(split)) > return new MyXMLFileReader(); > else > return NullRecordReader.INSTANCE; // do not read > } > > @Override > protected boolean isSplitable(JobContext context, Path file) { > String fname = file.getName().toLowerCase(); > if(fname.endsWith(".gz")) > return false; > return true; > } > > /** > * Generate the list of files and make them into FileSplits. > * This needs to be copied to insert a filter on acceptable data > */ > @Override > public List<InputSplit> getSplits(JobContext job > ) throws IOException { > long minSize = Math.max(getFormatMinSplitSize(), > getMinSplitSize(job)); > long maxSize = getMaxSplitSize(job); > > // generate splits > List<InputSplit> splits = new ArrayList<InputSplit>(); > for (FileStatus file : listStatus(job)) { > Path path = file.getPath(); > if (!isPathAcceptable(path)) // filter acceptable data > continue; > FileSystem fs = path.getFileSystem(job.getConfiguration()); > long length = file.getLen(); > BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, > length); > if ((length != 0) && isSplitable(job, path)) { > long blockSize = file.getBlockSize(); > long splitSize = computeSplitSize(blockSize, minSize, > maxSize); > > long bytesRemaining = length; > while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { > int blkIndex = getBlockIndex(blkLocations, length - > bytesRemaining); > splits.add(new FileSplit(path, length - bytesRemaining, > splitSize, > blkLocations[blkIndex].getHosts())); > bytesRemaining -= splitSize; > } > > if (bytesRemaining != 0) { > splits.add(new FileSplit(path, length - bytesRemaining, > bytesRemaining, > blkLocations[blkLocations.length - > 1].getHosts())); > } > } > else if (length != 0) { > splits.add(new FileSplit(path, 0, length, > blkLocations[0].getHosts())); > } > else { > //Create empty hosts array for zero length files > splits.add(new FileSplit(path, 0, length, new String[0])); > } > } > // LOG.debug("Total # of splits: " + splits.size()); > return splits; > } > > /** > * Custom RecordReader which returns the entire file as a > * single m_Value with the name as a m_Key > * Value is the entire file > * Key is the file name > */ > public class MyXMLFileReader extends RecordReader<Text, Text> { > > private CompressionCodecFactory compressionCodecs = null; > private long m_Start; > private long m_End; > private long m_Current; > private BufferedReader m_Input; > private Text m_Key; > private Text m_Value = null; > private char[] m_Buffer = new char[BUFFER_SIZE]; > StringBuilder m_Sb = new StringBuilder(); > > public void initialize(InputSplit genericSplit, > TaskAttemptContext context) throws IOException > { > FileSplit split = (FileSplit) genericSplit; > Configuration job = context.getConfiguration(); > m_Sb.setLength(0); > m_Start = split.getStart(); > m_End = m_Start + split.getLength(); > final Path file = split.getPath(); > compressionCodecs = new CompressionCodecFactory(job); > final CompressionCodec codec = compressionCodecs.getCodec(file); > > // open the file and seek to the m_Start of the split > FileSystem fs = file.getFileSystem(job); > FSDataInputStream fileIn = fs.open(split.getPath()); > if (codec != null) { > CompressionInputStream inputStream = > codec.createInputStream(fileIn); > m_Input = new BufferedReader(new > InputStreamReader(inputStream)); > m_End = Long.MAX_VALUE; > } > else { > m_Input = new BufferedReader(new InputStreamReader(fileIn)); > } > m_Current = m_Start; > if (m_Key == null) { > m_Key = new Text(); > } > m_Key.set(split.getPath().getName()); > if (m_Value == null) { > m_Value = new Text(); > } > > } > > /** > * look for a <scan tag then read until it closes > * > * @return true if there is data > * @throws java.io.IOException > */ > public boolean nextKeyValue() throws IOException { > if(readFromCurrentBuffer()) > return true; > int newSize = 0; > String startTag = getStartTag() + " "; > String startTag2 = getStartTag() + ">"; > newSize = m_Input.read(m_Buffer); > > while (newSize > 0) { > m_Current += newSize; > m_Sb.append(m_Buffer, 0, newSize); > if( readFromCurrentBuffer()) > return true; > newSize = m_Input.read(m_Buffer); > } > // exit because we are at the m_End > if (newSize <= 0) { > m_Key = null; > m_Value = null; > return false; > } > > return true; > } > > protected boolean readFromCurrentBuffer() > { > String endTag = getEndTag(); > String startText = m_Sb.toString(); > if(!startText.contains(endTag)) > return false; // need more read > String startTag = getStartTag() + " "; > String startTag2 = getStartTag() + ">"; > int index = startText.indexOf(startTag); > if (index == -1) > index = startText.indexOf(startTag2); > if(index == -1) > return false; > startText = startText.substring(index); > m_Sb.setLength(0); > m_Sb.append(startText); > > String s = startText; > index = s.indexOf(endTag); > if (index == -1) > return false; // need more read > // throw new IllegalStateException("unmatched tag " + > getBaseTag()); > index += endTag.length(); > String tag = s.substring(0, index).trim(); > m_Value.set(tag); > > // keep the remaining text to add to the next tag > m_Sb.setLength(0); > String rest = s.substring(index); > m_Sb.append(rest); > return true; > } > > @Override > public Text getCurrentKey() { > return m_Key; > } > > @Override > public Text getCurrentValue() { > return m_Value; > } > > /** > * Get the progress within the split > */ > public float getProgress() { > return ((float) m_Current - m_Start) / (m_Start - m_End); > } > > public synchronized void close() throws IOException { > if (m_Input != null) { > m_Input.close(); > } > } > } > } > > ============================================= > > On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shi...@gmail.com> wrote: > Hello everyone, > > I'm new to Hadoop and I'm trying to figure out how to design a M/R program to > parse a file and generate a PMML file as output. > > What I would like to do is split a file by a keyword instead a given number > of lines because the location of the split could change from time to time. > > I'm looking around and was thinking maybe KeyValueTextInputFormat would be > the way to go but I'm not finding any clear examples how to use it. So I'm > not sure if this is the right choice or not. > > Here is a basic input example of what I'm working with. > > [Input file info] > more info > more info > etc. > etc. > Keyword > different info > different info > Keyword > some more info > > For the example above, each section can be generated separately from each > other. However, within each section, different lines are dependent upon each > other to generate a valid PMML file. > > Can anyone offer a suggestion what type of input format I should use? > > Thanks for your time > Erik > > > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > >