Daniel Kinzler has submitted this change and it was merged.

Change subject: Added documentation for Hadoop InputFormat classes
......................................................................


Added documentation for Hadoop InputFormat classes

Change-Id: I9fb897d92ce33d6b71ffa7698218413f76c1b0d1
---
M 
client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java
M 
client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java
2 files changed, 250 insertions(+), 184 deletions(-)

Approvals:
  Daniel Kinzler: Verified; Looks good to me, approved



diff --git 
a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java
 
b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java
index a3a4808..ce39518 100644
--- 
a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java
+++ 
b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/WikiPageInputFormat.java
@@ -1,27 +1,39 @@
 package org.wikimedia.wikibase.entitysuggester.wikiparser;
 
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-
-
 import java.io.IOException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.*;
 import 
org.wikimedia.wikibase.entitysuggester.wikiparser.XMLInputFormat.XMLRecordReader;
 
+/**
+ * InputFormat implementation to be used with Hadoop for parsing the wikidata
+ * XML dumps,
+ *
+ * @author Nilesh Chakraborty
+ */
 public class WikiPageInputFormat extends FileInputFormat<LongWritable, Text> {
 
-  public static final String START_TAG = "<page>";
-  public static final String END_TAG = "</page>";
+    /**
+     * Everything between the start and end tags is treated as a single chunk
+     * and passed to the mapper.
+     */
+    public static final String START_TAG = "<page>";
+    public static final String END_TAG = "</page>";
 
-  @Override
-  public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
-      JobConf conf, Reporter reporter) throws IOException {
-    conf.set(XMLInputFormat.START_TAG_KEY, START_TAG);
-    conf.set(XMLInputFormat.END_TAG_KEY, END_TAG);
-    return new XMLRecordReader((FileSplit) split, conf);
-  }
+    /**
+     *
+     * @param split
+     * @param conf
+     * @param reporter
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
+            JobConf conf, Reporter reporter) throws IOException {
+        conf.set(XMLInputFormat.START_TAG_KEY, START_TAG);
+        conf.set(XMLInputFormat.END_TAG_KEY, END_TAG);
+        return new XMLRecordReader((FileSplit) split, conf);
+    }
 }
\ No newline at end of file
diff --git 
a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java
 
b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java
index 311fc82..4c44e8f 100644
--- 
a/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java
+++ 
b/client/src/main/java/org/wikimedia/wikibase/entitysuggester/wikiparser/XMLInputFormat.java
@@ -1,12 +1,9 @@
 package org.wikimedia.wikibase.entitysuggester.wikiparser;
 
 /*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
+ * To change this template, choose Tools | Templates and open the template in
+ * the editor.
  */
-
-
-
 import java.io.DataInputStream;
 import java.io.IOException;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -20,7 +17,6 @@
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.*;
 
-
 /**
  * Builtin InputFormat for XML, borrowed from Cloud9: {@link https
  * ://github.com/lintool
@@ -30,184 +26,242 @@
  * object.
  */
 public class XMLInputFormat extends TextInputFormat {
-  /** Define start tag of a complete input entry. */
-  public static final String START_TAG_KEY = "xmlinput.start";
-  /** Define end tag of a complete input entry. */
-  public static final String END_TAG_KEY = "xmlinput.end";
 
-  @Override
-  public void configure(JobConf jobConf) {
-    super.configure(jobConf);
-  }
+    /**
+     * Define start tag of a complete input entry.
+     */
+    public static final String START_TAG_KEY = "xmlinput.start";
+    /**
+     * Define end tag of a complete input entry.
+     */
+    public static final String END_TAG_KEY = "xmlinput.end";
 
-  @Override
-  public RecordReader<LongWritable, Text> getRecordReader(
-      InputSplit inputSplit, JobConf jobConf, Reporter reporter)
-      throws IOException {
-    return new XMLRecordReader((FileSplit) inputSplit, jobConf);
-  }
-
-  /**
-   * RecordReader for XML documents Recognizes begin-of-document and
-   * end-of-document tags only: Returning text object of everything in between
-   * delimiters
-   */
-  public static class XMLRecordReader implements
-      RecordReader<LongWritable, Text> {
-
-    private byte[] startTag;
-    private byte[] endTag;
-    private long start;
-    private long end;
-    private long pos;
-    private DataInputStream fsin = null;
-    private DataOutputBuffer buffer = new DataOutputBuffer();
-
-    private long recordStartPos;
-
-    public XMLRecordReader(FileSplit split, JobConf jobConf) throws 
IOException {
-      if (jobConf.get(START_TAG_KEY) == null
-          || jobConf.get(END_TAG_KEY) == null)
-        throw new RuntimeException("Error! XML start and end tags 
unspecified!");
-
-      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
-      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");
-
-      start = split.getStart();
-      Path file = split.getPath();
-
-      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(
-          jobConf);
-      CompressionCodec codec = compressionCodecs.getCodec(file);
-
-      FileSystem fs = file.getFileSystem(jobConf);
-
-      if (codec != null) {
-        LOG.info("Reading compressed file...");
-
-        fsin = new DataInputStream(codec.createInputStream(fs.open(file)));
-
-        end = Long.MAX_VALUE;
-      } else {
-        LOG.info("Reading uncompressed file...");
-
-        FSDataInputStream fileIn = fs.open(file);
-
-        fileIn.seek(start);
-        fsin = fileIn;
-
-        end = start + split.getLength();
-      }
-
-      recordStartPos = start;
-
-      // Because input streams of gzipped files are not seekable (specifically,
-      // do not support
-      // getPos), we need to keep track of bytes consumed ourselves.
-      pos = start;
+    /**
+     *
+     * @param jobConf
+     */
+    @Override
+    public void configure(JobConf jobConf) {
+        super.configure(jobConf);
     }
 
+    /**
+     *
+     * @param inputSplit
+     * @param jobConf
+     * @param reporter
+     * @return
+     * @throws IOException
+     */
     @Override
-    public boolean next(LongWritable key, Text value) throws IOException {
-      if (pos < end) {
-        if (readUntilMatch(startTag, false)) {
-          recordStartPos = pos - startTag.length;
+    public RecordReader<LongWritable, Text> getRecordReader(
+            InputSplit inputSplit, JobConf jobConf, Reporter reporter)
+            throws IOException {
+        return new XMLRecordReader((FileSplit) inputSplit, jobConf);
+    }
 
-          try {
-            buffer.write(startTag);
-            if (readUntilMatch(endTag, true)) {
-              key.set(recordStartPos);
-              value.set(buffer.getData(), 0, buffer.getLength());
-              return true;
+    /**
+     * RecordReader for XML documents Recognizes begin-of-document and
+     * end-of-document tags only: Returning text object of everything in 
between
+     * delimiters
+     */
+    public static class XMLRecordReader implements
+            RecordReader<LongWritable, Text> {
+
+        private byte[] startTag;
+        private byte[] endTag;
+        private long start;
+        private long end;
+        private long pos;
+        private DataInputStream fsin = null;
+        private DataOutputBuffer buffer = new DataOutputBuffer();
+        private long recordStartPos;
+
+        /**
+         *
+         * @param split
+         * @param jobConf
+         * @throws IOException
+         */
+        public XMLRecordReader(FileSplit split, JobConf jobConf) throws 
IOException {
+            if (jobConf.get(START_TAG_KEY) == null
+                    || jobConf.get(END_TAG_KEY) == null) {
+                throw new RuntimeException("Error! XML start and end tags 
unspecified!");
             }
-          } finally {
-            // Because input streams of gzipped files are not seekable
-            // (specifically, do not support
+
+            startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
+            endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");
+
+            start = split.getStart();
+            Path file = split.getPath();
+
+            CompressionCodecFactory compressionCodecs = new 
CompressionCodecFactory(
+                    jobConf);
+            CompressionCodec codec = compressionCodecs.getCodec(file);
+
+            FileSystem fs = file.getFileSystem(jobConf);
+
+            if (codec != null) {
+                LOG.info("Reading compressed file...");
+
+                fsin = new 
DataInputStream(codec.createInputStream(fs.open(file)));
+
+                end = Long.MAX_VALUE;
+            } else {
+                LOG.info("Reading uncompressed file...");
+
+                FSDataInputStream fileIn = fs.open(file);
+
+                fileIn.seek(start);
+                fsin = fileIn;
+
+                end = start + split.getLength();
+            }
+
+            recordStartPos = start;
+
+            // Because input streams of gzipped files are not seekable 
(specifically,
+            // do not support
             // getPos), we need to keep track of bytes consumed ourselves.
+            pos = start;
+        }
 
-            // This is a sanity check to make sure our internal computation of
-            // bytes consumed is
-            // accurate. This should be removed later for efficiency once we
-            // confirm that this code
-            // works correctly.
-            if (fsin instanceof Seekable) {
-              if (pos != ((Seekable) fsin).getPos()) {
-                // throw new RuntimeException("bytes consumed error!");
-                LOG.info("bytes conusmed error: " + String.valueOf(pos)
-                    + " != " + String.valueOf(((Seekable) fsin).getPos()));
-              }
+        /**
+         *
+         * @param key
+         * @param value
+         * @return
+         * @throws IOException
+         */
+        @Override
+        public boolean next(LongWritable key, Text value) throws IOException {
+            if (pos < end) {
+                if (readUntilMatch(startTag, false)) {
+                    recordStartPos = pos - startTag.length;
+
+                    try {
+                        buffer.write(startTag);
+                        if (readUntilMatch(endTag, true)) {
+                            key.set(recordStartPos);
+                            value.set(buffer.getData(), 0, buffer.getLength());
+                            return true;
+                        }
+                    } finally {
+                        // Because input streams of gzipped files are not 
seekable
+                        // (specifically, do not support
+                        // getPos), we need to keep track of bytes consumed 
ourselves.
+
+                        // This is a sanity check to make sure our internal 
computation of
+                        // bytes consumed is
+                        // accurate. This should be removed later for 
efficiency once we
+                        // confirm that this code
+                        // works correctly.
+                        if (fsin instanceof Seekable) {
+                            if (pos != ((Seekable) fsin).getPos()) {
+                                // throw new RuntimeException("bytes consumed 
error!");
+                                LOG.info("bytes conusmed error: " + 
String.valueOf(pos)
+                                        + " != " + String.valueOf(((Seekable) 
fsin).getPos()));
+                            }
+                        }
+
+                        buffer.reset();
+                    }
+                }
             }
-
-            buffer.reset();
-          }
+            return false;
         }
-      }
-      return false;
-    }
 
-    @Override
-    public LongWritable createKey() {
-      return new LongWritable();
-    }
-
-    @Override
-    public Text createValue() {
-      return new Text();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return pos;
-    }
-
-    @Override
-    public void close() throws IOException {
-      fsin.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      return ((float) (pos - start)) / ((float) (end - start));
-    }
-
-    public long getStart() {
-      return start;
-    }
-
-    public long getEnd() {
-      return end;
-    }
-
-    private boolean readUntilMatch(byte[] match, boolean withinBlock)
-        throws IOException {
-      int i = 0;
-      while (true) {
-        int b = fsin.read();
-        // Increment position (bytes consumed).
-        pos++;
-
-        // End of file:
-        if (b == -1) {
-          return false;
+        /**
+         *
+         * @return
+         */
+        @Override
+        public LongWritable createKey() {
+            return new LongWritable();
         }
-        // Save to buffer:
-        if (withinBlock) {
-          buffer.write(b);
+
+        /**
+         *
+         * @return
+         */
+        @Override
+        public Text createValue() {
+            return new Text();
         }
-        // Check if we're matching:
-        if (b == match[i]) {
-          i++;
-          if (i >= match.length) {
-            return true;
-          }
-        } else {
-          i = 0;
+
+        /**
+         *
+         * @return @throws IOException
+         */
+        @Override
+        public long getPos() throws IOException {
+            return pos;
         }
-        // See if we've passed the stop point:
-        if (!withinBlock && i == 0 && pos >= end) {
-          return false;
+
+        /**
+         *
+         * @throws IOException
+         */
+        @Override
+        public void close() throws IOException {
+            fsin.close();
         }
-      }
+
+        /**
+         *
+         * @return @throws IOException
+         */
+        @Override
+        public float getProgress() throws IOException {
+            return ((float) (pos - start)) / ((float) (end - start));
+        }
+
+        /**
+         *
+         * @return
+         */
+        public long getStart() {
+            return start;
+        }
+
+        /**
+         *
+         * @return
+         */
+        public long getEnd() {
+            return end;
+        }
+
+        private boolean readUntilMatch(byte[] match, boolean withinBlock)
+                throws IOException {
+            int i = 0;
+            while (true) {
+                int b = fsin.read();
+                // Increment position (bytes consumed).
+                pos++;
+
+                // End of file:
+                if (b == -1) {
+                    return false;
+                }
+                // Save to buffer:
+                if (withinBlock) {
+                    buffer.write(b);
+                }
+                // Check if we're matching:
+                if (b == match[i]) {
+                    i++;
+                    if (i >= match.length) {
+                        return true;
+                    }
+                } else {
+                    i = 0;
+                }
+                // See if we've passed the stop point:
+                if (!withinBlock && i == 0 && pos >= end) {
+                    return false;
+                }
+            }
+        }
     }
-  }
 }
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/75381
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I9fb897d92ce33d6b71ffa7698218413f76c1b0d1
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/WikidataEntitySuggester
Gerrit-Branch: master
Gerrit-Owner: Nilesh <nil...@nileshc.com>
Gerrit-Reviewer: Daniel Kinzler <daniel.kinz...@wikimedia.de>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to