Author: jlowe
Date: Thu Apr 11 21:17:56 2013
New Revision: 1467090

URL: http://svn.apache.org/r1467090
Log:
HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit tests. 
Contributed by Vadim Bondarev

Added:
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1467090&r1=1467089&r2=1467090&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Apr 
11 21:17:56 2013
@@ -623,6 +623,9 @@ Release 2.0.5-beta - UNRELEASED
     HADOOP-9222. Cover package with org.apache.hadoop.io.lz4 unit tests (Vadim 
     Bondarev via jlowe)
 
+    HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit
+    tests (Vadim Bondarev via jlowe)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -1627,6 +1630,9 @@ Release 0.23.8 - UNRELEASED
     HADOOP-9222. Cover package with org.apache.hadoop.io.lz4 unit tests (Vadim 
     Bondarev via jlowe)
 
+    HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit
+    tests (Vadim Bondarev via jlowe)
+
 Release 0.23.7 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Added: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java?rev=1467090&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java
 (added)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java
 Thu Apr 11 21:17:56 2013
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import static org.junit.Assert.*;
+
+public class CompressDecompressTester<T extends Compressor, E extends 
Decompressor> {
+
+  private static final Logger logger = Logger
+      .getLogger(CompressDecompressTester.class);
+
+  private final byte[] originalRawData;
+
+  private ImmutableList<TesterPair<T, E>> pairs = ImmutableList.of();
+  private ImmutableList.Builder<TesterPair<T, E>> builder = 
ImmutableList.builder();     
+
+  private ImmutableSet<CompressionTestStrategy> stateges = ImmutableSet.of();
+
+  private PreAssertionTester<T, E> assertionDelegate;
+  
+  public CompressDecompressTester(byte[] originalRawData) {
+    this.originalRawData = Arrays.copyOf(originalRawData,
+        originalRawData.length);
+    this.assertionDelegate = new PreAssertionTester<T, E>() {
+
+      @Override
+      public ImmutableList<TesterPair<T, E>> filterOnAssumeWhat(
+          ImmutableList<TesterPair<T, E>> pairs) {
+        ImmutableList.Builder<TesterPair<T, E>> builder = ImmutableList
+            .builder();
+
+        for (TesterPair<T, E> pair : pairs) {
+          if (isAvailable(pair))
+            builder.add(pair);
+        }
+        return builder.build();
+      }
+    };
+  }
+
+  private static boolean isNativeSnappyLoadable() {
+    boolean snappyAvailable = false;
+    boolean loaded = false;
+    try {
+      System.loadLibrary("snappy");
+      logger.warn("Snappy native library is available");
+      snappyAvailable = true;
+      boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded();
+      loaded = snappyAvailable && hadoopNativeAvailable;
+      if (loaded) {
+        logger.info("Snappy native library loaded");
+      } else {
+        logger.warn("Snappy native library not loaded");
+      }
+    } catch (Throwable t) {
+      logger.warn("Failed to load snappy: ", t);
+      return false;
+    }
+    return loaded;
+  }
+
+  public static <T extends Compressor, E extends Decompressor> 
CompressDecompressTester<T, E> of(
+      byte[] rawData) {
+    return new CompressDecompressTester<T, E>(rawData);
+  }
+  
+
+  public CompressDecompressTester<T, E> withCompressDecompressPair(
+      T compressor, E decompressor) {
+    addPair(
+        compressor,
+        decompressor,
+        Joiner.on("_").join(compressor.getClass().getCanonicalName(),
+            decompressor.getClass().getCanonicalName()));
+    return this;
+  }
+  
+  public CompressDecompressTester<T, E> withTestCases(
+      ImmutableSet<CompressionTestStrategy> stateges) {
+    this.stateges = ImmutableSet.copyOf(stateges);
+    return this;
+  }
+
+  private void addPair(T compressor, E decompressor, String name) {
+    builder.add(new TesterPair<T, E>(name, compressor, decompressor));
+  }
+
+  public void test() throws InstantiationException, IllegalAccessException {
+    pairs = builder.build();
+    pairs = assertionDelegate.filterOnAssumeWhat(pairs);
+
+    for (TesterPair<T, E> pair : pairs) {
+      for (CompressionTestStrategy strategy : stateges) {
+        strategy.getTesterStrategy().assertCompression(pair.getName(),
+            pair.getCompressor(), pair.getDecompressor(),
+            Arrays.copyOf(originalRawData, originalRawData.length));
+      }
+    }
+    endAll(pairs);
+  }
+
+  private void endAll(ImmutableList<TesterPair<T, E>> pairs) {
+    for (TesterPair<T, E> pair : pairs)
+      pair.end();
+  }
+
+  interface PreAssertionTester<T extends Compressor, E extends Decompressor> {
+    ImmutableList<TesterPair<T, E>> filterOnAssumeWhat(
+        ImmutableList<TesterPair<T, E>> pairs);
+  }
+
+  public enum CompressionTestStrategy {
+
+    COMPRESS_DECOMPRESS_ERRORS(new TesterCompressionStrategy() {
+      private final Joiner joiner = Joiner.on("- ");
+
+      @Override
+      public void assertCompression(String name, Compressor compressor,
+          Decompressor decompressor, byte[] rawData) {
+        assertTrue(checkSetInputNullPointerException(compressor));
+        assertTrue(checkSetInputNullPointerException(decompressor));
+
+        assertTrue(checkCompressArrayIndexOutOfBoundsException(compressor,
+            rawData));
+        assertTrue(checkCompressArrayIndexOutOfBoundsException(decompressor,
+            rawData));
+
+        assertTrue(checkCompressNullPointerException(compressor, rawData));
+        assertTrue(checkCompressNullPointerException(decompressor, rawData));
+
+        assertTrue(checkSetInputArrayIndexOutOfBoundsException(compressor));
+        assertTrue(checkSetInputArrayIndexOutOfBoundsException(decompressor));
+      }
+
+      private boolean checkSetInputNullPointerException(Compressor compressor) 
{
+        try {
+          compressor.setInput(null, 0, 1);
+        } catch (NullPointerException npe) {
+          return true;
+        } catch (Exception ex) {
+          logger.error(joiner.join(compressor.getClass().getCanonicalName(),
+              "checkSetInputNullPointerException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkCompressNullPointerException(Compressor compressor,
+          byte[] rawData) {
+        try {
+          compressor.setInput(rawData, 0, rawData.length);
+          compressor.compress(null, 0, 1);
+        } catch (NullPointerException npe) {
+          return true;
+        } catch (Exception ex) {
+          logger.error(joiner.join(compressor.getClass().getCanonicalName(),
+              "checkCompressNullPointerException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkCompressNullPointerException(
+          Decompressor decompressor, byte[] rawData) {
+        try {
+          decompressor.setInput(rawData, 0, rawData.length);
+          decompressor.decompress(null, 0, 1);
+        } catch (NullPointerException npe) {
+          return true;
+        } catch (Exception ex) {
+          logger.error(joiner.join(decompressor.getClass().getCanonicalName(),
+              "checkCompressNullPointerException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkSetInputNullPointerException(
+          Decompressor decompressor) {
+        try {
+          decompressor.setInput(null, 0, 1);
+        } catch (NullPointerException npe) {
+          return true;
+        } catch (Exception ex) {
+          logger.error(joiner.join(decompressor.getClass().getCanonicalName(),
+              "checkSetInputNullPointerException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkSetInputArrayIndexOutOfBoundsException(
+          Compressor compressor) {
+        try {
+          compressor.setInput(new byte[] { (byte) 0 }, 0, -1);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          return true;
+        } catch (Exception e) {
+          logger.error(joiner.join(compressor.getClass().getCanonicalName(),
+              "checkSetInputArrayIndexOutOfBoundsException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkCompressArrayIndexOutOfBoundsException(
+          Compressor compressor, byte[] rawData) {
+        try {
+          compressor.setInput(rawData, 0, rawData.length);
+          compressor.compress(new byte[rawData.length], 0, -1);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          return true;
+        } catch (Exception e) {
+          logger.error(joiner.join(compressor.getClass().getCanonicalName(),
+              "checkCompressArrayIndexOutOfBoundsException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkCompressArrayIndexOutOfBoundsException(
+          Decompressor decompressor, byte[] rawData) {
+        try {
+          decompressor.setInput(rawData, 0, rawData.length);
+          decompressor.decompress(new byte[rawData.length], 0, -1);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          return true;
+        } catch (Exception e) {
+          logger.error(joiner.join(decompressor.getClass().getCanonicalName(),
+              "checkCompressArrayIndexOutOfBoundsException error !!!"));
+        }
+        return false;
+      }
+
+      private boolean checkSetInputArrayIndexOutOfBoundsException(
+          Decompressor decompressor) {
+        try {
+          decompressor.setInput(new byte[] { (byte) 0 }, 0, -1);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          return true;
+        } catch (Exception e) {
+          logger.error(joiner.join(decompressor.getClass().getCanonicalName(),
+              "checkNullPointerException error !!!"));
+        }
+        return false;
+      }
+
+    }),
+
+    COMPRESS_DECOMPRESS_SINGLE_BLOCK(new TesterCompressionStrategy() {
+      final Joiner joiner = Joiner.on("- ");
+
+      @Override
+      public void assertCompression(String name, Compressor compressor,
+          Decompressor decompressor, byte[] rawData) {
+
+        int cSize = 0;
+        int decompressedSize = 0;
+        byte[] compressedResult = new byte[rawData.length];
+        byte[] decompressedBytes = new byte[rawData.length];
+        try {
+          assertTrue(
+              joiner.join(name, "compressor.needsInput before error !!!"),
+              compressor.needsInput());
+          assertTrue(
+              joiner.join(name, "compressor.getBytesWritten before error !!!"),
+              compressor.getBytesWritten() == 0);
+          compressor.setInput(rawData, 0, rawData.length);
+          compressor.finish();
+          while (!compressor.finished()) {
+            cSize += compressor.compress(compressedResult, 0,
+                compressedResult.length);
+          }
+          compressor.reset();
+
+          assertTrue(
+              joiner.join(name, "decompressor.needsInput() before error !!!"),
+              decompressor.needsInput());
+          decompressor.setInput(compressedResult, 0, cSize);
+          assertFalse(
+              joiner.join(name, "decompressor.needsInput() after error !!!"),
+              decompressor.needsInput());
+          while (!decompressor.finished()) {
+            decompressedSize = decompressor.decompress(decompressedBytes, 0,
+                decompressedBytes.length);
+          }
+          decompressor.reset();
+          assertTrue(joiner.join(name, " byte size not equals error !!!"),
+              decompressedSize == rawData.length);
+          assertArrayEquals(
+              joiner.join(name, " byte arrays not equals error !!!"), rawData,
+              decompressedBytes);
+        } catch (Exception ex) {
+          fail(joiner.join(name, ex.getMessage()));
+        }
+      }
+    }),
+
+    COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM(new TesterCompressionStrategy() {
+      final Joiner joiner = Joiner.on("- ");
+      final ImmutableMap<Class<? extends Compressor>, Integer> emptySize = 
ImmutableMap
+          .of(Lz4Compressor.class, 4, ZlibCompressor.class, 16,
+              SnappyCompressor.class, 4, BuiltInZlibDeflater.class, 16);
+
+      @Override
+      void assertCompression(String name, Compressor compressor,
+          Decompressor decompressor, byte[] originalRawData) {
+        byte[] buf = null;
+        ByteArrayInputStream bytesIn = null;
+        BlockDecompressorStream blockDecompressorStream = null;
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        // close without write
+        try {
+          compressor.reset();
+          // decompressor.end();
+          BlockCompressorStream blockCompressorStream = new 
BlockCompressorStream(
+              bytesOut, compressor, 1024, 0);
+          blockCompressorStream.close();
+          // check compressed output
+          buf = bytesOut.toByteArray();
+          int emSize = emptySize.get(compressor.getClass());
+          Assert.assertEquals(
+              joiner.join(name, "empty stream compressed output size != "
+                  + emSize), emSize, buf.length);
+          // use compressed output as input for decompression
+          bytesIn = new ByteArrayInputStream(buf);
+          // create decompression stream
+          blockDecompressorStream = new BlockDecompressorStream(bytesIn,
+              decompressor, 1024);
+          // no byte is available because stream was closed
+          assertEquals(joiner.join(name, " return value is not -1"), -1,
+              blockDecompressorStream.read());
+        } catch (IOException e) {
+          fail(joiner.join(name, e.getMessage()));
+        } finally {
+          if (blockDecompressorStream != null)
+            try {
+              bytesOut.close();
+              blockDecompressorStream.close();
+              bytesIn.close();
+              blockDecompressorStream.close();
+            } catch (IOException e) {
+            }
+        }
+      }
+
+    }),
+
+    COMPRESS_DECOMPRESS_BLOCK(new TesterCompressionStrategy() {
+      private final Joiner joiner = Joiner.on("- ");
+      private static final int BLOCK_SIZE = 512;
+      private final byte[] operationBlock = new byte[BLOCK_SIZE];
+      // Use default of 512 as bufferSize and compressionOverhead of
+      // (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm).
+      private static final int overheadSpace = BLOCK_SIZE / 100 + 12;
+
+      @Override
+      public void assertCompression(String name, Compressor compressor,
+          Decompressor decompressor, byte[] originalRawData) {
+        int off = 0;
+        int len = originalRawData.length;
+        int maxSize = BLOCK_SIZE - overheadSpace;
+        int compresSize = 0;
+        List<Integer> blockLabels = new ArrayList<Integer>();
+        ByteArrayOutputStream compressedOut = new ByteArrayOutputStream();
+        ByteArrayOutputStream decompressOut = new ByteArrayOutputStream();
+        try {
+          if (originalRawData.length > maxSize) {
+            do {
+              int bufLen = Math.min(len, maxSize);
+              compressor.setInput(originalRawData, off, bufLen);
+              compressor.finish();
+              while (!compressor.finished()) {
+                compresSize = compressor.compress(operationBlock, 0,
+                    operationBlock.length);
+                compressedOut.write(operationBlock, 0, compresSize);
+                blockLabels.add(compresSize);
+              }
+              compressor.reset();
+              off += bufLen;
+              len -= bufLen;
+            } while (len > 0);
+          }
+
+          off = 0;
+          // compressed bytes
+          byte[] compressedBytes = compressedOut.toByteArray();
+          for (Integer step : blockLabels) {
+            decompressor.setInput(compressedBytes, off, step);
+            while (!decompressor.finished()) {
+              int dSize = decompressor.decompress(operationBlock, 0,
+                  operationBlock.length);
+              decompressOut.write(operationBlock, 0, dSize);
+            }
+            decompressor.reset();
+            off = off + step;
+          }
+          assertArrayEquals(
+              joiner.join(name, "byte arrays not equals error !!!"),
+              originalRawData, decompressOut.toByteArray());
+        } catch (Exception ex) {
+          fail(joiner.join(name, ex.getMessage()));
+        } finally {
+          try {
+            compressedOut.close();
+          } catch (IOException e) {
+          }
+          try {
+            decompressOut.close();
+          } catch (IOException e) {
+          }
+        }
+      }
+    });
+
+    private final TesterCompressionStrategy testerStrategy;
+
+    CompressionTestStrategy(TesterCompressionStrategy testStrategy) {
+      this.testerStrategy = testStrategy;
+    }
+
+    public TesterCompressionStrategy getTesterStrategy() {
+      return testerStrategy;
+    }
+  }
+
+  static final class TesterPair<T extends Compressor, E extends Decompressor> {
+    private final T compressor;
+    private final E decompressor;
+    private final String name;
+
+    TesterPair(String name, T compressor, E decompressor) {
+      this.compressor = compressor;
+      this.decompressor = decompressor;
+      this.name = name;
+    }
+
+    public void end() {
+      Configuration cfg = new Configuration();
+      compressor.reinit(cfg);
+      compressor.end();
+      decompressor.end();
+    }
+
+    public T getCompressor() {
+      return compressor;
+    }
+
+    public E getDecompressor() {
+      return decompressor;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+  
+  /**
+   * Method for compressor availability check
+   */
+  private static <T extends Compressor, E extends Decompressor> boolean 
isAvailable(TesterPair<T, E> pair) {
+    Compressor compressor = pair.compressor;
+
+    if (compressor.getClass().isAssignableFrom(Lz4Compressor.class)
+            && (NativeCodeLoader.isNativeCodeLoaded()))
+      return true;
+
+    else if (compressor.getClass().isAssignableFrom(BuiltInZlibDeflater.class)
+            && NativeCodeLoader.isNativeCodeLoaded())
+      return true;
+
+    else if (compressor.getClass().isAssignableFrom(ZlibCompressor.class)) {
+      return ZlibFactory.isNativeZlibLoaded(new Configuration());
+    }              
+    else if (compressor.getClass().isAssignableFrom(SnappyCompressor.class)
+            && isNativeSnappyLoadable())
+      return true;
+    
+    return false;      
+  }
+  
+  abstract static class TesterCompressionStrategy {
+
+    protected final Logger logger = Logger.getLogger(getClass());
+
+    abstract void assertCompression(String name, Compressor compressor,
+        Decompressor decompressor, byte[] originalRawData);
+  }
+}

Added: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java?rev=1467090&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
 (added)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
 Thu Apr 11 21:17:56 2013
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.junit.Assert.fail;
+import java.util.Random;
+import 
org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
+import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
+import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
+/** 
+ * Test for pairs:
+ * <pre>
+ * SnappyCompressor/SnappyDecompressor
+ * Lz4Compressor/Lz4Decompressor
+ * BuiltInZlibDeflater/new BuiltInZlibInflater
+ *
+ *
+ * Note: we can't use ZlibCompressor/ZlibDecompressor here 
+ * because his constructor can throw exception (if native libraries not found)
+ * For ZlibCompressor/ZlibDecompressor pair testing used {@code 
TestZlibCompressorDecompressor}   
+ *
+ * </pre>
+ *
+ */
+public class TestCompressorDecompressor {
+  
+  private static final Random rnd = new Random(12345L);
+  
+  @Test
+  public void testCompressorDecompressor() {
+    // no more for this data
+    int SIZE = 44 * 1024;
+    
+    byte[] rawData = generate(SIZE);
+    try {
+      CompressDecompressTester.of(rawData)
+          .withCompressDecompressPair(new SnappyCompressor(), new 
SnappyDecompressor())
+          .withCompressDecompressPair(new Lz4Compressor(), new 
Lz4Decompressor())
+          .withCompressDecompressPair(new BuiltInZlibDeflater(), new 
BuiltInZlibInflater())
+          
.withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+                      CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+                      CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+                      
CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+          .test();
+
+    } catch (Exception ex) {
+      fail("testCompressorDecompressor error !!!" + ex);
+    }
+  }
+  
+  @Test
+  public void testCompressorDecompressorWithExeedBufferLimit() {
+    int BYTE_SIZE = 100 * 1024;
+    byte[] rawData = generate(BYTE_SIZE);
+    try {
+      CompressDecompressTester.of(rawData)
+          .withCompressDecompressPair(
+              new SnappyCompressor(BYTE_SIZE + BYTE_SIZE / 2),
+              new SnappyDecompressor(BYTE_SIZE + BYTE_SIZE / 2))
+          .withCompressDecompressPair(new Lz4Compressor(BYTE_SIZE),
+              new Lz4Decompressor(BYTE_SIZE))
+          
.withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+                      CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+                      CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+                      
CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+          .test();
+
+    } catch (Exception ex) {
+      fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex);
+    }
+  }
+       
+  public static byte[] generate(int size) {
+    byte[] array = new byte[size];
+    for (int i = 0; i < size; i++)
+      array[i] = (byte) rnd.nextInt(16);
+    return array;
+  }
+}

Added: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java?rev=1467090&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
 (added)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
 Thu Apr 11 21:17:56 2013
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.zlib;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressDecompressTester;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+import 
org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
+public class TestZlibCompressorDecompressor {
+
+  private static final Random random = new Random(12345L);
+
+  @Before
+  public void before() {
+    assumeTrue(ZlibFactory.isNativeZlibLoaded(new Configuration()));
+  }  
+  
+  @Test
+  public void testZlibCompressorDecompressor() {
+    try {
+      int SIZE = 44 * 1024;
+      byte[] rawData = generate(SIZE);
+      
+      CompressDecompressTester.of(rawData)
+        .withCompressDecompressPair(new ZlibCompressor(), new 
ZlibDecompressor())
+        
.withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+           CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+           CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+           CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+         .test();
+    } catch (Exception ex) {
+      fail("testCompressorDecompressor error !!!" + ex);
+    }
+  }
+  
+  @Test
+  public void testCompressorDecompressorWithExeedBufferLimit() {
+    int BYTE_SIZE = 100 * 1024;
+    byte[] rawData = generate(BYTE_SIZE);
+    try {
+      CompressDecompressTester.of(rawData)
+      .withCompressDecompressPair(
+        new ZlibCompressor(
+            
org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.BEST_COMPRESSION,
+            CompressionStrategy.DEFAULT_STRATEGY,
+            
org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionHeader.DEFAULT_HEADER,
+            BYTE_SIZE),
+         new ZlibDecompressor(
+            
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader.DEFAULT_HEADER,
+            BYTE_SIZE))
+         
.withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+            CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+            CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+            CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+          .test();
+    } catch (Exception ex) {
+      fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex);
+    } 
+  }
+  
+  
+  @Test
+  public void testZlibCompressorDecompressorWithConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      byte[] rawData;
+      int tryNumber = 5;
+      int BYTE_SIZE = 10 * 1024;
+      Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+      Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
+      rawData = generate(BYTE_SIZE);
+      try {
+        for (int i = 0; i < tryNumber; i++)
+          compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
+              (ZlibDecompressor) zlibDecompressor);
+        zlibCompressor.reinit(conf);
+      } catch (Exception ex) {
+        fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
+      }
+    } else {
+      assertTrue("ZlibFactory is using native libs against request",
+          ZlibFactory.isNativeZlibLoaded(conf));
+    }
+  }
+
+  @Test
+  public void testZlibCompressDecompress() {
+    byte[] rawData = null;
+    int rawDataSize = 0;
+    rawDataSize = 1024 * 64;
+    rawData = generate(rawDataSize);
+    try {
+      ZlibCompressor compressor = new ZlibCompressor();
+      ZlibDecompressor decompressor = new ZlibDecompressor();
+      assertFalse("testZlibCompressDecompress finished error",
+          compressor.finished());
+      compressor.setInput(rawData, 0, rawData.length);
+      assertTrue("testZlibCompressDecompress getBytesRead before error",
+          compressor.getBytesRead() == 0);
+      compressor.finish();
+
+      byte[] compressedResult = new byte[rawDataSize];
+      int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+      assertTrue("testZlibCompressDecompress getBytesRead ather error",
+          compressor.getBytesRead() == rawDataSize);
+      assertTrue(
+          "testZlibCompressDecompress compressed size no less then original 
size",
+          cSize < rawDataSize);
+      decompressor.setInput(compressedResult, 0, cSize);
+      byte[] decompressedBytes = new byte[rawDataSize];
+      decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+      assertArrayEquals("testZlibCompressDecompress arrays not equals ",
+          rawData, decompressedBytes);
+      compressor.reset();
+      decompressor.reset();
+    } catch (IOException ex) {
+      fail("testZlibCompressDecompress ex !!!" + ex);
+    }
+  }
+  
+  @Test
+  public void testZlibCompressorDecompressorSetDictionary() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+      Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
+
+      checkSetDictionaryNullPointerException(zlibCompressor);
+      checkSetDictionaryNullPointerException(zlibDecompressor);
+
+      checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
+      checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
+    } else {
+      assertTrue("ZlibFactory is using native libs against request",
+          ZlibFactory.isNativeZlibLoaded(conf));
+    }
+  }
+
+  @Test
+  public void testZlibFactory() {
+    Configuration cfg = new Configuration();
+
+    assertTrue("testZlibFactory compression level error !!!",
+        CompressionLevel.DEFAULT_COMPRESSION == ZlibFactory
+            .getCompressionLevel(cfg));
+
+    assertTrue("testZlibFactory compression strategy error !!!",
+        CompressionStrategy.DEFAULT_STRATEGY == ZlibFactory
+            .getCompressionStrategy(cfg));
+
+    ZlibFactory.setCompressionLevel(cfg, CompressionLevel.BEST_COMPRESSION);
+    assertTrue("testZlibFactory compression strategy error !!!",
+        CompressionLevel.BEST_COMPRESSION == ZlibFactory
+            .getCompressionLevel(cfg));
+
+    ZlibFactory.setCompressionStrategy(cfg, CompressionStrategy.FILTERED);
+    assertTrue("testZlibFactory compression strategy error !!!",
+        CompressionStrategy.FILTERED == 
ZlibFactory.getCompressionStrategy(cfg));
+  }
+  
+
+  private boolean checkSetDictionaryNullPointerException(
+      Decompressor decompressor) {
+    try {
+      decompressor.setDictionary(null, 0, 1);
+    } catch (NullPointerException ex) {
+      return true;
+    } catch (Exception ex) {
+    }
+    return false;
+  }
+
+  private boolean checkSetDictionaryNullPointerException(Compressor 
compressor) {
+    try {
+      compressor.setDictionary(null, 0, 1);
+    } catch (NullPointerException ex) {
+      return true;
+    } catch (Exception ex) {
+    }
+    return false;
+  }
+
+  private boolean checkSetDictionaryArrayIndexOutOfBoundsException(
+      Compressor compressor) {
+    try {
+      compressor.setDictionary(new byte[] { (byte) 0 }, 0, -1);
+    } catch (ArrayIndexOutOfBoundsException e) {
+      return true;
+    } catch (Exception e) {
+    }
+    return false;
+  }
+
+  private boolean checkSetDictionaryArrayIndexOutOfBoundsException(
+      Decompressor decompressor) {
+    try {
+      decompressor.setDictionary(new byte[] { (byte) 0 }, 0, -1);
+    } catch (ArrayIndexOutOfBoundsException e) {
+      return true;
+    } catch (Exception e) {
+    }
+    return false;
+  }
+
+  private byte[] compressDecompressZlib(byte[] rawData,
+      ZlibCompressor zlibCompressor, ZlibDecompressor zlibDecompressor)
+      throws IOException {
+    int cSize = 0;
+    byte[] compressedByte = new byte[rawData.length];
+    byte[] decompressedRawData = new byte[rawData.length];
+    zlibCompressor.setInput(rawData, 0, rawData.length);
+    zlibCompressor.finish();
+    while (!zlibCompressor.finished()) {
+      cSize = zlibCompressor.compress(compressedByte, 0, 
compressedByte.length);
+    }
+    zlibCompressor.reset();
+
+    assertTrue(zlibDecompressor.getBytesWritten() == 0);
+    assertTrue(zlibDecompressor.getBytesRead() == 0);
+    assertTrue(zlibDecompressor.needsInput());
+    zlibDecompressor.setInput(compressedByte, 0, cSize);
+    assertFalse(zlibDecompressor.needsInput());
+    while (!zlibDecompressor.finished()) {
+      zlibDecompressor.decompress(decompressedRawData, 0,
+          decompressedRawData.length);
+    }
+    assertTrue(zlibDecompressor.getBytesWritten() == rawData.length);
+    assertTrue(zlibDecompressor.getBytesRead() == cSize);
+    zlibDecompressor.reset();
+    assertTrue(zlibDecompressor.getRemaining() == 0);
+    assertArrayEquals(
+        "testZlibCompressorDecompressorWithConfiguration array equals error",
+        rawData, decompressedRawData);
+
+    return decompressedRawData;
+  }
+
+  @Test
+  public void testBuiltInGzipDecompressorExceptions() {
+    BuiltInGzipDecompressor decompresser = new BuiltInGzipDecompressor();
+    try {
+      decompresser.setInput(null, 0, 1);
+    } catch (NullPointerException ex) {
+      // expected
+    } catch (Exception ex) {
+      fail("testBuiltInGzipDecompressorExceptions npe error " + ex);
+    }
+
+    try {
+      decompresser.setInput(new byte[] { 0 }, 0, -1);
+    } catch (ArrayIndexOutOfBoundsException ex) {
+      // expected
+    } catch (Exception ex) {
+      fail("testBuiltInGzipDecompressorExceptions aioob error" + ex);
+    }        
+    
+    assertTrue("decompresser.getBytesRead error",
+        decompresser.getBytesRead() == 0);
+    assertTrue("decompresser.getRemaining error",
+        decompresser.getRemaining() == 0);
+    decompresser.reset();
+    decompresser.end();
+
+    InputStream decompStream = null;
+    try {
+      // invalid 0 and 1 bytes , must be 31, -117
+      int buffSize = 1 * 1024;
+      byte buffer[] = new byte[buffSize];
+      Decompressor decompressor = new BuiltInGzipDecompressor();
+      DataInputBuffer gzbuf = new DataInputBuffer();
+      decompStream = new DecompressorStream(gzbuf, decompressor);
+      gzbuf.reset(new byte[] { 0, 0, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+      decompStream.read(buffer);
+    } catch (IOException ioex) {
+      // expected
+    } catch (Exception ex) {
+      fail("invalid 0 and 1 byte in gzip stream" + ex);
+    }
+
+    // invalid 2 byte, must be 8
+    try {
+      int buffSize = 1 * 1024;
+      byte buffer[] = new byte[buffSize];
+      Decompressor decompressor = new BuiltInGzipDecompressor();
+      DataInputBuffer gzbuf = new DataInputBuffer();
+      decompStream = new DecompressorStream(gzbuf, decompressor);
+      gzbuf.reset(new byte[] { 31, -117, 7, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+      decompStream.read(buffer);
+    } catch (IOException ioex) {
+      // expected
+    } catch (Exception ex) {
+      fail("invalid 2 byte in gzip stream" + ex);
+    }
+
+    try {
+      int buffSize = 1 * 1024;
+      byte buffer[] = new byte[buffSize];
+      Decompressor decompressor = new BuiltInGzipDecompressor();
+      DataInputBuffer gzbuf = new DataInputBuffer();
+      decompStream = new DecompressorStream(gzbuf, decompressor);
+      gzbuf.reset(new byte[] { 31, -117, 8, -32, 1, 1, 1, 11, 1, 1, 1, 1 }, 
11);
+      decompStream.read(buffer);
+    } catch (IOException ioex) {
+      // expected
+    } catch (Exception ex) {
+      fail("invalid 3 byte in gzip stream" + ex);
+    }
+    try {
+      int buffSize = 1 * 1024;
+      byte buffer[] = new byte[buffSize];
+      Decompressor decompressor = new BuiltInGzipDecompressor();
+      DataInputBuffer gzbuf = new DataInputBuffer();
+      decompStream = new DecompressorStream(gzbuf, decompressor);
+      gzbuf.reset(new byte[] { 31, -117, 8, 4, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+      decompStream.read(buffer);
+    } catch (IOException ioex) {
+      // expected
+    } catch (Exception ex) {
+      fail("invalid 3 byte make hasExtraField" + ex);
+    }
+  }
+  
+  public static byte[] generate(int size) {
+    byte[] data = new byte[size];
+    for (int i = 0; i < size; i++)
+      data[i] = (byte)random.nextInt(16);
+    return data;
+  }
+}


Reply via email to