HADOOP-8522. ResetableGzipOutputStream creates invalid gzip files when finish() 
and resetState() are used. Contributed by Mike Percy


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/796a0d3a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/796a0d3a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/796a0d3a

Branch: refs/heads/HDFS-7240
Commit: 796a0d3a5c661f0c3b23af9c0db2d8f3db83c322
Parents: 6d201f7
Author: Chris Douglas <cdoug...@apache.org>
Authored: Fri Nov 10 16:29:36 2017 -0800
Committer: Chris Douglas <cdoug...@apache.org>
Committed: Fri Nov 10 17:41:29 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/GzipCodec.java    |  37 +++-
 .../hadoop/io/compress/TestGzipCodec.java       | 169 +++++++++++++++++++
 2 files changed, 201 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/796a0d3a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
index 11fcf60..9bd861d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
@@ -41,27 +41,54 @@ import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 @InterfaceStability.Evolving
 public class GzipCodec extends DefaultCodec {
   /**
-   * A bridge that wraps around a DeflaterOutputStream to make it 
+   * A bridge that wraps around a DeflaterOutputStream to make it
    * a CompressionOutputStream.
    */
   @InterfaceStability.Evolving
   protected static class GzipOutputStream extends CompressorStream {
 
     private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+      /**
+       * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
+       * details.
+       */
+      private static final byte[] GZIP_HEADER = new byte[] {
+          0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
+
+      private boolean reset = false;
 
       public ResetableGZIPOutputStream(OutputStream out) throws IOException {
         super(out);
       }
 
-      public void resetState() throws IOException {
-        def.reset();
+      public synchronized void resetState() throws IOException {
+        reset = true;
+      }
+
+      @Override
+      public synchronized void write(byte[] buf, int off, int len)
+          throws IOException {
+        if (reset) {
+          def.reset();
+          crc.reset();
+          out.write(GZIP_HEADER);
+          reset = false;
+        }
+        super.write(buf, off, len);
+      }
+
+      @Override
+      public synchronized void close() throws IOException {
+        reset = false;
+        super.close();
       }
+
     }
 
     public GzipOutputStream(OutputStream out) throws IOException {
       super(new ResetableGZIPOutputStream(out));
     }
-    
+
     /**
      * Allow children types to put a different type in here.
      * @param out the Deflater stream to use
@@ -69,7 +96,7 @@ public class GzipCodec extends DefaultCodec {
     protected GzipOutputStream(CompressorStream out) {
       super(out);
     }
-    
+
     @Override
     public void close() throws IOException {
       out.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/796a0d3a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java
new file mode 100644
index 0000000..c8c1a47
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify resettable compressor.
+ */
+public class TestGzipCodec {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestGzipCodec.class);
+
+  private static final String DATA1 = "Dogs don't know it's not bacon!\n";
+  private static final String DATA2 = "It's baconnnn!!\n";
+  private GzipCodec codec = new GzipCodec();
+
+  @Before
+  public void setUp() {
+    codec.setConf(new Configuration(false));
+  }
+
+  // Test simple compression.
+  @Test
+  public void testSingleCompress() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CompressionOutputStream cmpOut = codec.createOutputStream(baos);
+    cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
+    cmpOut.finish();
+    cmpOut.close();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    GZIPInputStream cmpIn = new GZIPInputStream(bais);
+    byte[] buf = new byte[1024];
+    int len = cmpIn.read(buf);
+    String result = new String(buf, 0, len, StandardCharsets.UTF_8);
+    assertEquals("Input must match output", DATA1, result);
+  }
+
+  // Test multi-member gzip file created via finish(), resetState().
+  @Test
+  public void testResetCompress() throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    CompressionOutputStream cmpOut = codec.createOutputStream(dob);
+    cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
+    cmpOut.finish();
+    cmpOut.resetState();
+    cmpOut.write(DATA2.getBytes(StandardCharsets.UTF_8));
+    cmpOut.finish();
+    cmpOut.close();
+    dob.close();
+
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(dob.getData(), 0, dob.getLength());
+    CompressionInputStream cmpIn = codec.createInputStream(dib);
+    byte[] buf = new byte[1024];
+    StringBuilder result = new StringBuilder();
+    int len = 0;
+    while (true) {
+      len = cmpIn.read(buf);
+      if (len < 0) {
+        break;
+      }
+      result.append(new String(buf, 0, len, StandardCharsets.UTF_8));
+    }
+    assertEquals("Output must match input", DATA1 + DATA2, result.toString());
+  }
+
+  // ensure all necessary methods are overwritten
+  @Test
+  public void testWriteOverride() throws IOException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    LOG.info("seed: " + seed);
+    r.setSeed(seed);
+    byte[] buf = new byte[128];
+    r.nextBytes(buf);
+    DataOutputBuffer dob = new DataOutputBuffer();
+    CompressionOutputStream cmpOut = codec.createOutputStream(dob);
+    cmpOut.write(buf);
+    int i = r.nextInt(128 - 10);
+    int l = r.nextInt(128 - i);
+    cmpOut.write(buf, i, l);
+    cmpOut.write((byte)(r.nextInt() & 0xFF));
+    cmpOut.close();
+
+    r.setSeed(seed);
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(dob.getData(), 0, dob.getLength());
+    CompressionInputStream cmpIn = codec.createInputStream(dib);
+    byte[] vbuf = new byte[128];
+    assertEquals(128, cmpIn.read(vbuf));
+    assertArrayEquals(buf, vbuf);
+    r.nextBytes(vbuf);
+    int vi = r.nextInt(128 - 10);
+    int vl = r.nextInt(128 - vi);
+    assertEquals(vl, cmpIn.read(vbuf, 0, vl));
+    assertArrayEquals(Arrays.copyOfRange(buf, i,  i + l),
+        Arrays.copyOf(vbuf, vl));
+    assertEquals(r.nextInt() & 0xFF, cmpIn.read());
+    assertEquals(-1, cmpIn.read());
+  }
+
+  // don't write a new header if no data are written after reset
+  @Test
+  public void testIdempotentResetState() throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    CompressionOutputStream cmpOut = codec.createOutputStream(dob);
+    cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
+    cmpOut.finish();
+    cmpOut.finish();
+    cmpOut.finish();
+    cmpOut.resetState();
+    cmpOut.resetState();
+    cmpOut.finish();
+    cmpOut.resetState();
+    cmpOut.close();
+    dob.close();
+
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(dob.getData(), 0, dob.getLength());
+    CompressionInputStream cmpIn = codec.createInputStream(dib);
+    byte[] buf = new byte[1024];
+    StringBuilder result = new StringBuilder();
+    int len = 0;
+    while (true) {
+      len = cmpIn.read(buf);
+      if (len < 0) {
+        break;
+      }
+      result.append(new String(buf, 0, len, StandardCharsets.UTF_8));
+    }
+    assertEquals("Output must match input", DATA1, result.toString());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to