HADOOP-14919. BZip2 drops records when reading data in splits. Contributed by 
Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2fae63aa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2fae63aa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2fae63aa

Branch: refs/heads/YARN-1011
Commit: 2fae63aa60c43b62bd908a9499562fe528603185
Parents: c02d2ba
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Oct 31 09:30:13 2017 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Oct 31 09:30:13 2017 -0500

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/BZip2Codec.java   | 39 +---------
 .../io/compress/bzip2/CBZip2InputStream.java    | 32 +++++----
 .../hadoop/mapred/TestTextInputFormat.java      | 76 ++++++++++++++++++++
 3 files changed, 98 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2fae63aa/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 331606e..db78118 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -204,43 +204,8 @@ public class BZip2Codec implements Configurable, 
SplittableCompressionCodec {
           Seekable.class.getName());
     }
 
-    //find the position of first BZip2 start up marker
-    ((Seekable)seekableIn).seek(0);
-
-    // BZip2 start of block markers are of 6 bytes.  But the very first block
-    // also has "BZh9", making it 10 bytes.  This is the common case.  But at
-    // time stream might start without a leading BZ.
-    final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
-      CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
-    long adjStart = 0L;
-    if (start != 0) {
-      // Other than the first of file, the marker size is 6 bytes.
-      adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
-          - (HEADER_LEN + SUB_HEADER_LEN)));
-    }
-
-    ((Seekable)seekableIn).seek(adjStart);
-    SplitCompressionInputStream in =
-      new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
-
-
-    // The following if clause handles the following case:
-    // Assume the following scenario in BZip2 compressed stream where
-    // . represent compressed data.
-    // .....[48 bit Block].....[48 bit   Block].....[48 bit Block]...
-    // ........................[47 bits][1 bit].....[48 bit Block]...
-    // ................................^[Assume a Byte alignment here]
-    // ........................................^^[current position of stream]
-    // .....................^^[We go back 10 Bytes in stream and find a Block 
marker]
-    // ........................................^^[We align at wrong position!]
-    // ...........................................................^^[While 
this pos is correct]
-
-    if (in.getPos() < start) {
-      ((Seekable)seekableIn).seek(start);
-      in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
-    }
-
-    return in;
+    ((Seekable)seekableIn).seek(start);
+    return new BZip2CompressionInputStream(seekableIn, start, end, readMode);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2fae63aa/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
index 1f7632b..bb02cf2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
@@ -52,20 +52,20 @@ import 
org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
  * This Ant code was enhanced so that it can de-compress blocks of bzip2 data.
  * Current position in the stream is an important statistic for Hadoop. For
  * example in LineRecordReader, we solely depend on the current position in the
- * stream to know about the progess. The notion of position becomes complicated
+ * stream to know about the progress. The notion of position becomes 
complicated
  * for compressed files. The Hadoop splitting is done in terms of compressed
  * file. But a compressed file deflates to a large amount of data. So we have
  * handled this problem in the following way.
  *
  * On object creation time, we find the next block start delimiter. Once such a
  * marker is found, the stream stops there (we discard any read compressed data
- * in this process) and the position is updated (i.e. the caller of this class
- * will find out the stream location). At this point we are ready for actual
- * reading (i.e. decompression) of data.
+ * in this process) and the position is reported as the beginning of the block
+ * start delimiter. At this point we are ready for actual reading
+ * (i.e. decompression) of data.
  *
  * The subsequent read calls give out data. The position is updated when the
  * caller of this class has read off the current block + 1 bytes. In between 
the
- * block reading, position is not updated. (We can only update the postion on
+ * block reading, position is not updated. (We can only update the position on
  * block boundaries).
  * </p>
  *
@@ -204,11 +204,12 @@ public class CBZip2InputStream extends InputStream 
implements BZip2Constants {
   * in the stream.  It can find bit patterns of length <= 63 bits.  
Specifically
   * this method is used in CBZip2InputStream to find the end of block (EOB)
   * delimiter in the stream, starting from the current position of the stream.
-  * If marker is found, the stream position will be right after marker at the
-  * end of this call.
+  * If marker is found, the stream position will be at the byte containing
+  * the starting bit of the marker.
   *
   * @param marker  The bit pattern to be found in the stream
   * @param markerBitLength  No of bits in the marker
+  * @return true if the marker was found otherwise false
   *
   * @throws IOException
   * @throws IllegalArgumentException  if marketBitLength is greater than 63
@@ -224,23 +225,33 @@ public class CBZip2InputStream extends InputStream 
implements BZip2Constants {
       long bytes = 0;
       bytes = this.bsR(markerBitLength);
       if (bytes == -1) {
+        this.reportedBytesReadFromCompressedStream =
+            this.bytesReadFromCompressedStream;
         return false;
       }
       while (true) {
         if (bytes == marker) {
+          // Report the byte position where the marker starts
+          long markerBytesRead = (markerBitLength + this.bsLive + 7) / 8;
+          this.reportedBytesReadFromCompressedStream =
+              this.bytesReadFromCompressedStream - markerBytesRead;
           return true;
-
         } else {
           bytes = bytes << 1;
           bytes = bytes & ((1L << markerBitLength) - 1);
           int oneBit = (int) this.bsR(1);
           if (oneBit != -1) {
             bytes = bytes | oneBit;
-          } else
+          } else {
+            this.reportedBytesReadFromCompressedStream =
+                this.bytesReadFromCompressedStream;
             return false;
+          }
         }
       }
     } catch (IOException ex) {
+      this.reportedBytesReadFromCompressedStream =
+          this.bytesReadFromCompressedStream;
       return false;
     }
   }
@@ -302,7 +313,6 @@ public class CBZip2InputStream extends InputStream 
implements BZip2Constants {
     } else if (readMode == READ_MODE.BYBLOCK) {
       this.currentState = STATE.NO_PROCESS_STATE;
       skipResult = 
this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
-      this.reportedBytesReadFromCompressedStream = 
this.bytesReadFromCompressedStream;
       if(!skipDecompression){
         changeStateToProcessABlock();
       }
@@ -419,8 +429,6 @@ public class CBZip2InputStream extends InputStream 
implements BZip2Constants {
       result = b;
 
       skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, 
DELIMITER_BIT_LENGTH);
-      //Exactly when we are about to start a new block, we advertise the 
stream position.
-      this.reportedBytesReadFromCompressedStream = 
this.bytesReadFromCompressedStream;
 
       changeStateToProcessABlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2fae63aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index 67bd497..11f0bb5 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -186,6 +186,82 @@ public class TestTextInputFormat {
     verifyPartitions(473608, 110, file, codec, conf);
   }
 
+  // Test a corner case when position of stream is right after BZip2 marker
+  @Test (timeout=900000)
+  public void testSplitableCodecs2() throws IOException {
+    JobConf conf = new JobConf(defaultConf);
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      
ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"),
 conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(conf, workDir);
+
+    int length = 250000;
+    LOG.info("creating; entries = " + length);
+    // create a file with length entries
+    Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+    try {
+      for (int i = 0; i < length; i++) {
+        writer.write(Integer.toString(i));
+        writer.write("\n");
+      }
+    } finally {
+      writer.close();
+    }
+
+    // Test split positions around a block boundary where the block does
+    // not start on a byte boundary.
+    for (long splitpos = 203418; splitpos < 203430; ++splitpos) {
+      TextInputFormat format = new TextInputFormat();
+      format.configure(conf);
+      LOG.info("setting block size of the input file to " + splitpos);
+      conf.setLong("mapreduce.input.fileinputformat.split.minsize", splitpos);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      InputSplit[] splits = format.getSplits(conf, 2);
+      LOG.info("splitting: got =        " + splits.length);
+
+      // check each split
+      BitSet bits = new BitSet(length);
+      for (int j = 0; j < splits.length; j++) {
+        LOG.debug("split[" + j + "]= " + splits[j]);
+        RecordReader<LongWritable, Text> reader =
+            format.getRecordReader(splits[j], conf, Reporter.NULL);
+        try {
+          int counter = 0;
+          while (reader.next(key, value)) {
+            int v = Integer.parseInt(value.toString());
+            LOG.debug("read " + v);
+            if (bits.get(v)) {
+              LOG.warn("conflict with " + v + " in split " + j +
+                  " at position " + reader.getPos());
+            }
+            assertFalse("Key in multiple partitions.", bits.get(v));
+            bits.set(v);
+            counter++;
+          }
+          if (counter > 0) {
+            LOG.info("splits[" + j + "]=" + splits[j] + " count=" + counter);
+          } else {
+            LOG.debug("splits[" + j + "]=" + splits[j] + " count=" + counter);
+          }
+        } finally {
+          reader.close();
+        }
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+    }
+  }
+
   private void verifyPartitions(int length, int numSplits, Path file,
       CompressionCodec codec, JobConf conf) throws IOException {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to