Daniel Kinzler has submitted this change and it was merged. Change subject: Added documentation for Hadoop InputFormat classes ......................................................................
Added documentation for Hadoop InputFormat classes Change-Id: I9fb897d92ce33d6b71ffa7698218413f76c1b0d1 --- M client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java M client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java 2 files changed, 250 insertions(+), 184 deletions(-) Approvals: Daniel Kinzler: Verified; Looks good to me, approved diff --git a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java index a3a4808..ce39518 100644 --- a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java +++ b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java @@ -1,27 +1,39 @@ package org.wikimedia.wikibase.entitysuggester.wikiparser; -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ - - import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.wikimedia.wikibase.entitysuggester.wikiparser.XMLInputFormat.XMLRecordReader; +/** + * InputFormat implementation to be used with Hadoop for parsing the wikidata + * XML dumps, + * + * @author Nilesh Chakraborty + */ public class WikiPageInputFormat extends FileInputFormat<LongWritable, Text> { - public static final String START_TAG = "<page>"; - public static final String END_TAG = "</page>"; + /** + * Everything between the start and end tags is treated as a single chunk + * and passed to the mapper. + */ + public static final String START_TAG = "<page>"; + public static final String END_TAG = "</page>"; - @Override - public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, - JobConf conf, Reporter reporter) throws IOException { - conf.set(XMLInputFormat.START_TAG_KEY, START_TAG); - conf.set(XMLInputFormat.END_TAG_KEY, END_TAG); - return new XMLRecordReader((FileSplit) split, conf); - } + /** + * + * @param split + * @param conf + * @param reporter + * @return + * @throws IOException + */ + @Override + public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, + JobConf conf, Reporter reporter) throws IOException { + conf.set(XMLInputFormat.START_TAG_KEY, START_TAG); + conf.set(XMLInputFormat.END_TAG_KEY, END_TAG); + return new XMLRecordReader((FileSplit) split, conf); + } } \ No newline at end of file diff --git a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java index 311fc82..4c44e8f 100644 --- a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java +++ b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java @@ -1,12 +1,9 @@ package org.wikimedia.wikibase.entitysuggester.wikiparser; /* - * To change this template, choose Tools | Templates - * and open the template in the editor. + * To change this template, choose Tools | Templates and open the template in + * the editor. */ - - - import java.io.DataInputStream; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -20,7 +17,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.*; - /** * Builtin InputFormat for XML, borrowed from Cloud9: {@link https * ://github.com/lintool @@ -30,184 +26,242 @@ * object. */ public class XMLInputFormat extends TextInputFormat { - /** Define start tag of a complete input entry. */ - public static final String START_TAG_KEY = "xmlinput.start"; - /** Define end tag of a complete input entry. */ - public static final String END_TAG_KEY = "xmlinput.end"; - @Override - public void configure(JobConf jobConf) { - super.configure(jobConf); - } + /** + * Define start tag of a complete input entry. + */ + public static final String START_TAG_KEY = "xmlinput.start"; + /** + * Define end tag of a complete input entry. + */ + public static final String END_TAG_KEY = "xmlinput.end"; - @Override - public RecordReader<LongWritable, Text> getRecordReader( - InputSplit inputSplit, JobConf jobConf, Reporter reporter) - throws IOException { - return new XMLRecordReader((FileSplit) inputSplit, jobConf); - } - - /** - * RecordReader for XML documents Recognizes begin-of-document and - * end-of-document tags only: Returning text object of everything in between - * delimiters - */ - public static class XMLRecordReader implements - RecordReader<LongWritable, Text> { - - private byte[] startTag; - private byte[] endTag; - private long start; - private long end; - private long pos; - private DataInputStream fsin = null; - private DataOutputBuffer buffer = new DataOutputBuffer(); - - private long recordStartPos; - - public XMLRecordReader(FileSplit split, JobConf jobConf) throws IOException { - if (jobConf.get(START_TAG_KEY) == null - || jobConf.get(END_TAG_KEY) == null) - throw new RuntimeException("Error! XML start and end tags unspecified!"); - - startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8"); - endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8"); - - start = split.getStart(); - Path file = split.getPath(); - - CompressionCodecFactory compressionCodecs = new CompressionCodecFactory( - jobConf); - CompressionCodec codec = compressionCodecs.getCodec(file); - - FileSystem fs = file.getFileSystem(jobConf); - - if (codec != null) { - LOG.info("Reading compressed file..."); - - fsin = new DataInputStream(codec.createInputStream(fs.open(file))); - - end = Long.MAX_VALUE; - } else { - LOG.info("Reading uncompressed file..."); - - FSDataInputStream fileIn = fs.open(file); - - fileIn.seek(start); - fsin = fileIn; - - end = start + split.getLength(); - } - - recordStartPos = start; - - // Because input streams of gzipped files are not seekable (specifically, - // do not support - // getPos), we need to keep track of bytes consumed ourselves. - pos = start; + /** + * + * @param jobConf + */ + @Override + public void configure(JobConf jobConf) { + super.configure(jobConf); } + /** + * + * @param inputSplit + * @param jobConf + * @param reporter + * @return + * @throws IOException + */ @Override - public boolean next(LongWritable key, Text value) throws IOException { - if (pos < end) { - if (readUntilMatch(startTag, false)) { - recordStartPos = pos - startTag.length; + public RecordReader<LongWritable, Text> getRecordReader( + InputSplit inputSplit, JobConf jobConf, Reporter reporter) + throws IOException { + return new XMLRecordReader((FileSplit) inputSplit, jobConf); + } - try { - buffer.write(startTag); - if (readUntilMatch(endTag, true)) { - key.set(recordStartPos); - value.set(buffer.getData(), 0, buffer.getLength()); - return true; + /** + * RecordReader for XML documents Recognizes begin-of-document and + * end-of-document tags only: Returning text object of everything in between + * delimiters + */ + public static class XMLRecordReader implements + RecordReader<LongWritable, Text> { + + private byte[] startTag; + private byte[] endTag; + private long start; + private long end; + private long pos; + private DataInputStream fsin = null; + private DataOutputBuffer buffer = new DataOutputBuffer(); + private long recordStartPos; + + /** + * + * @param split + * @param jobConf + * @throws IOException + */ + public XMLRecordReader(FileSplit split, JobConf jobConf) throws IOException { + if (jobConf.get(START_TAG_KEY) == null + || jobConf.get(END_TAG_KEY) == null) { + throw new RuntimeException("Error! XML start and end tags unspecified!"); } - } finally { - // Because input streams of gzipped files are not seekable - // (specifically, do not support + + startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8"); + endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8"); + + start = split.getStart(); + Path file = split.getPath(); + + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory( + jobConf); + CompressionCodec codec = compressionCodecs.getCodec(file); + + FileSystem fs = file.getFileSystem(jobConf); + + if (codec != null) { + LOG.info("Reading compressed file..."); + + fsin = new DataInputStream(codec.createInputStream(fs.open(file))); + + end = Long.MAX_VALUE; + } else { + LOG.info("Reading uncompressed file..."); + + FSDataInputStream fileIn = fs.open(file); + + fileIn.seek(start); + fsin = fileIn; + + end = start + split.getLength(); + } + + recordStartPos = start; + + // Because input streams of gzipped files are not seekable (specifically, + // do not support // getPos), we need to keep track of bytes consumed ourselves. + pos = start; + } - // This is a sanity check to make sure our internal computation of - // bytes consumed is - // accurate. This should be removed later for efficiency once we - // confirm that this code - // works correctly. - if (fsin instanceof Seekable) { - if (pos != ((Seekable) fsin).getPos()) { - // throw new RuntimeException("bytes consumed error!"); - LOG.info("bytes conusmed error: " + String.valueOf(pos) - + " != " + String.valueOf(((Seekable) fsin).getPos())); - } + /** + * + * @param key + * @param value + * @return + * @throws IOException + */ + @Override + public boolean next(LongWritable key, Text value) throws IOException { + if (pos < end) { + if (readUntilMatch(startTag, false)) { + recordStartPos = pos - startTag.length; + + try { + buffer.write(startTag); + if (readUntilMatch(endTag, true)) { + key.set(recordStartPos); + value.set(buffer.getData(), 0, buffer.getLength()); + return true; + } + } finally { + // Because input streams of gzipped files are not seekable + // (specifically, do not support + // getPos), we need to keep track of bytes consumed ourselves. + + // This is a sanity check to make sure our internal computation of + // bytes consumed is + // accurate. This should be removed later for efficiency once we + // confirm that this code + // works correctly. + if (fsin instanceof Seekable) { + if (pos != ((Seekable) fsin).getPos()) { + // throw new RuntimeException("bytes consumed error!"); + LOG.info("bytes conusmed error: " + String.valueOf(pos) + + " != " + String.valueOf(((Seekable) fsin).getPos())); + } + } + + buffer.reset(); + } + } } - - buffer.reset(); - } + return false; } - } - return false; - } - @Override - public LongWritable createKey() { - return new LongWritable(); - } - - @Override - public Text createValue() { - return new Text(); - } - - @Override - public long getPos() throws IOException { - return pos; - } - - @Override - public void close() throws IOException { - fsin.close(); - } - - @Override - public float getProgress() throws IOException { - return ((float) (pos - start)) / ((float) (end - start)); - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - private boolean readUntilMatch(byte[] match, boolean withinBlock) - throws IOException { - int i = 0; - while (true) { - int b = fsin.read(); - // Increment position (bytes consumed). - pos++; - - // End of file: - if (b == -1) { - return false; + /** + * + * @return + */ + @Override + public LongWritable createKey() { + return new LongWritable(); } - // Save to buffer: - if (withinBlock) { - buffer.write(b); + + /** + * + * @return + */ + @Override + public Text createValue() { + return new Text(); } - // Check if we're matching: - if (b == match[i]) { - i++; - if (i >= match.length) { - return true; - } - } else { - i = 0; + + /** + * + * @return @throws IOException + */ + @Override + public long getPos() throws IOException { + return pos; } - // See if we've passed the stop point: - if (!withinBlock && i == 0 && pos >= end) { - return false; + + /** + * + * @throws IOException + */ + @Override + public void close() throws IOException { + fsin.close(); } - } + + /** + * + * @return @throws IOException + */ + @Override + public float getProgress() throws IOException { + return ((float) (pos - start)) / ((float) (end - start)); + } + + /** + * + * @return + */ + public long getStart() { + return start; + } + + /** + * + * @return + */ + public long getEnd() { + return end; + } + + private boolean readUntilMatch(byte[] match, boolean withinBlock) + throws IOException { + int i = 0; + while (true) { + int b = fsin.read(); + // Increment position (bytes consumed). + pos++; + + // End of file: + if (b == -1) { + return false; + } + // Save to buffer: + if (withinBlock) { + buffer.write(b); + } + // Check if we're matching: + if (b == match[i]) { + i++; + if (i >= match.length) { + return true; + } + } else { + i = 0; + } + // See if we've passed the stop point: + if (!withinBlock && i == 0 && pos >= end) { + return false; + } + } + } } - } } \ No newline at end of file -- To view, visit https://gerrit.wikimedia.org/r/75381 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9fb897d92ce33d6b71ffa7698218413f76c1b0d1 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/WikidataEntitySuggester Gerrit-Branch: master Gerrit-Owner: Nilesh <nil...@nileshc.com> Gerrit-Reviewer: Daniel Kinzler <daniel.kinz...@wikimedia.de> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits