Repository: hadoop
Updated Branches:
  refs/heads/trunk bbb3b1acc -> 8f9ab998e


HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib codecs. 
Contributed by Gopal Vijayaraghavan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f9ab998
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f9ab998
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f9ab998

Branch: refs/heads/trunk
Commit: 8f9ab998e273259c1e7a3ed53ba37d767e02b6bb
Parents: bbb3b1a
Author: Arun C. Murthy <acmur...@apache.org>
Authored: Sun Oct 5 07:38:21 2014 -0700
Committer: Arun C. Murthy <acmur...@apache.org>
Committed: Sun Oct 5 07:38:21 2014 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../io/compress/snappy/SnappyCompressor.java    | 24 +++---
 .../io/compress/snappy/SnappyDecompressor.java  | 26 +++---
 .../hadoop/io/compress/zlib/ZlibCompressor.java | 24 +++---
 .../io/compress/zlib/ZlibDecompressor.java      | 30 +++----
 .../hadoop/io/compress/TestCodecPool.java       | 87 ++++++++++++++++++++
 6 files changed, 142 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6a45540..f025615 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -632,6 +632,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10731. Remove @date JavaDoc comment in ProgramDriver class (Henry 
     Saputra via aw)
 
+    HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib
+    codecs. (Gopal Vijayaraghavan via acmurthy)
+
   BUG FIXES
 
     HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
index 376ea06..ab45f25 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
@@ -100,7 +100,7 @@ public class SnappyCompressor implements Compressor {
    * @param len Length
    */
   @Override
-  public synchronized void setInput(byte[] b, int off, int len) {
+  public void setInput(byte[] b, int off, int len) {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -127,7 +127,7 @@ public class SnappyCompressor implements Compressor {
    * aside to be loaded by this function while the compressed data are
    * consumed.
    */
-  synchronized void setInputFromSavedData() {
+  void setInputFromSavedData() {
     if (0 >= userBufLen) {
       return;
     }
@@ -146,7 +146,7 @@ public class SnappyCompressor implements Compressor {
    * Does nothing.
    */
   @Override
-  public synchronized void setDictionary(byte[] b, int off, int len) {
+  public void setDictionary(byte[] b, int off, int len) {
     // do nothing
   }
 
@@ -158,7 +158,7 @@ public class SnappyCompressor implements Compressor {
    *         #setInput() should be called in order to provide more input.
    */
   @Override
-  public synchronized boolean needsInput() {
+  public boolean needsInput() {
     return !(compressedDirectBuf.remaining() > 0
         || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
   }
@@ -168,7 +168,7 @@ public class SnappyCompressor implements Compressor {
    * with the current contents of the input buffer.
    */
   @Override
-  public synchronized void finish() {
+  public void finish() {
     finish = true;
   }
 
@@ -180,7 +180,7 @@ public class SnappyCompressor implements Compressor {
    *         data output stream has been reached.
    */
   @Override
-  public synchronized boolean finished() {
+  public boolean finished() {
     // Check if all uncompressed data has been consumed
     return (finish && finished && compressedDirectBuf.remaining() == 0);
   }
@@ -197,7 +197,7 @@ public class SnappyCompressor implements Compressor {
    * @return The actual number of bytes of compressed data.
    */
   @Override
-  public synchronized int compress(byte[] b, int off, int len)
+  public int compress(byte[] b, int off, int len)
       throws IOException {
     if (b == null) {
       throw new NullPointerException();
@@ -250,7 +250,7 @@ public class SnappyCompressor implements Compressor {
    * Resets compressor so that a new set of input data can be processed.
    */
   @Override
-  public synchronized void reset() {
+  public void reset() {
     finish = false;
     finished = false;
     uncompressedDirectBuf.clear();
@@ -268,7 +268,7 @@ public class SnappyCompressor implements Compressor {
    * @param conf Configuration from which new setting are fetched
    */
   @Override
-  public synchronized void reinit(Configuration conf) {
+  public void reinit(Configuration conf) {
     reset();
   }
 
@@ -276,7 +276,7 @@ public class SnappyCompressor implements Compressor {
    * Return number of bytes given to this compressor since last reset.
    */
   @Override
-  public synchronized long getBytesRead() {
+  public long getBytesRead() {
     return bytesRead;
   }
 
@@ -284,7 +284,7 @@ public class SnappyCompressor implements Compressor {
    * Return number of bytes consumed by callers of compress since last reset.
    */
   @Override
-  public synchronized long getBytesWritten() {
+  public long getBytesWritten() {
     return bytesWritten;
   }
 
@@ -292,7 +292,7 @@ public class SnappyCompressor implements Compressor {
    * Closes the compressor and discards any unprocessed input.
    */
   @Override
-  public synchronized void end() {
+  public void end() {
   }
 
   private native static void initIDs();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
index a867717..b5f5acf 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
@@ -103,7 +103,7 @@ public class SnappyDecompressor implements Decompressor {
    * @param len Length
    */
   @Override
-  public synchronized void setInput(byte[] b, int off, int len) {
+  public void setInput(byte[] b, int off, int len) {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -127,7 +127,7 @@ public class SnappyDecompressor implements Decompressor {
    * aside to be loaded by this function while the compressed data are
    * consumed.
    */
-  synchronized void setInputFromSavedData() {
+  void setInputFromSavedData() {
     compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
 
     // Reinitialize snappy's input direct buffer
@@ -144,7 +144,7 @@ public class SnappyDecompressor implements Decompressor {
    * Does nothing.
    */
   @Override
-  public synchronized void setDictionary(byte[] b, int off, int len) {
+  public void setDictionary(byte[] b, int off, int len) {
     // do nothing
   }
 
@@ -158,7 +158,7 @@ public class SnappyDecompressor implements Decompressor {
    *         order to provide more input.
    */
   @Override
-  public synchronized boolean needsInput() {
+  public boolean needsInput() {
     // Consume remaining compressed data?
     if (uncompressedDirectBuf.remaining() > 0) {
       return false;
@@ -183,7 +183,7 @@ public class SnappyDecompressor implements Decompressor {
    * @return <code>false</code>.
    */
   @Override
-  public synchronized boolean needsDictionary() {
+  public boolean needsDictionary() {
     return false;
   }
 
@@ -195,7 +195,7 @@ public class SnappyDecompressor implements Decompressor {
    *         data output stream has been reached.
    */
   @Override
-  public synchronized boolean finished() {
+  public boolean finished() {
     return (finished && uncompressedDirectBuf.remaining() == 0);
   }
 
@@ -212,7 +212,7 @@ public class SnappyDecompressor implements Decompressor {
    * @throws IOException
    */
   @Override
-  public synchronized int decompress(byte[] b, int off, int len)
+  public int decompress(byte[] b, int off, int len)
       throws IOException {
     if (b == null) {
       throw new NullPointerException();
@@ -257,13 +257,13 @@ public class SnappyDecompressor implements Decompressor {
    * @return <code>0</code>.
    */
   @Override
-  public synchronized int getRemaining() {
+  public int getRemaining() {
     // Never use this function in BlockDecompressorStream.
     return 0;
   }
 
   @Override
-  public synchronized void reset() {
+  public void reset() {
     finished = false;
     compressedDirectBufLen = 0;
     uncompressedDirectBuf.limit(directBufferSize);
@@ -276,7 +276,7 @@ public class SnappyDecompressor implements Decompressor {
    * input data can be processed.
    */
   @Override
-  public synchronized void end() {
+  public void end() {
     // do nothing
   }
 
@@ -333,7 +333,7 @@ public class SnappyDecompressor implements Decompressor {
     private boolean endOfInput;
 
     @Override
-    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+    public void decompress(ByteBuffer src, ByteBuffer dst)
         throws IOException {
       assert dst.isDirect() : "dst.isDirect()";
       assert src.isDirect() : "src.isDirect()";
@@ -343,13 +343,13 @@ public class SnappyDecompressor implements Decompressor {
     }
 
     @Override
-    public synchronized void setDictionary(byte[] b, int off, int len) {
+    public void setDictionary(byte[] b, int off, int len) {
       throw new UnsupportedOperationException(
           "byte[] arrays are not supported for DirectDecompressor");
     }
 
     @Override
-    public synchronized int decompress(byte[] b, int off, int len) {
+    public int decompress(byte[] b, int off, int len) {
       throw new UnsupportedOperationException(
           "byte[] arrays are not supported for DirectDecompressor");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
index ce7f68d..6799403 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
@@ -243,7 +243,7 @@ public class ZlibCompressor implements Compressor {
    * @param conf Configuration storing new settings
    */
   @Override
-  public synchronized void reinit(Configuration conf) {
+  public void reinit(Configuration conf) {
     reset();
     if (conf == null) {
       return;
@@ -260,7 +260,7 @@ public class ZlibCompressor implements Compressor {
   }
 
   @Override
-  public synchronized void setInput(byte[] b, int off, int len) {
+  public void setInput(byte[] b, int off, int len) {
     if (b== null) {
       throw new NullPointerException();
     }
@@ -280,7 +280,7 @@ public class ZlibCompressor implements Compressor {
   }
   
   //copy enough data from userBuf to uncompressedDirectBuf
-  synchronized void setInputFromSavedData() {
+  void setInputFromSavedData() {
     int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
     ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
     userBufLen -= len;
@@ -289,7 +289,7 @@ public class ZlibCompressor implements Compressor {
   }
 
   @Override
-  public synchronized void setDictionary(byte[] b, int off, int len) {
+  public void setDictionary(byte[] b, int off, int len) {
     if (stream == 0 || b == null) {
       throw new NullPointerException();
     }
@@ -300,7 +300,7 @@ public class ZlibCompressor implements Compressor {
   }
 
   @Override
-  public synchronized boolean needsInput() {
+  public boolean needsInput() {
     // Consume remaining compressed data?
     if (compressedDirectBuf.remaining() > 0) {
       return false;
@@ -329,19 +329,19 @@ public class ZlibCompressor implements Compressor {
   }
   
   @Override
-  public synchronized void finish() {
+  public void finish() {
     finish = true;
   }
   
   @Override
-  public synchronized boolean finished() {
+  public boolean finished() {
     // Check if 'zlib' says its 'finished' and
     // all compressed data has been consumed
     return (finished && compressedDirectBuf.remaining() == 0);
   }
 
   @Override
-  public synchronized int compress(byte[] b, int off, int len) 
+  public int compress(byte[] b, int off, int len) 
     throws IOException {
     if (b == null) {
       throw new NullPointerException();
@@ -392,7 +392,7 @@ public class ZlibCompressor implements Compressor {
    * @return the total (non-negative) number of compressed bytes output so far
    */
   @Override
-  public synchronized long getBytesWritten() {
+  public long getBytesWritten() {
     checkStream();
     return getBytesWritten(stream);
   }
@@ -403,13 +403,13 @@ public class ZlibCompressor implements Compressor {
    * @return the total (non-negative) number of uncompressed bytes input so far
    */
   @Override
-  public synchronized long getBytesRead() {
+  public long getBytesRead() {
     checkStream();
     return getBytesRead(stream);
   }
 
   @Override
-  public synchronized void reset() {
+  public void reset() {
     checkStream();
     reset(stream);
     finish = false;
@@ -423,7 +423,7 @@ public class ZlibCompressor implements Compressor {
   }
   
   @Override
-  public synchronized void end() {
+  public void end() {
     if (stream != 0) {
       end(stream);
       stream = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
index 575ce3c..89c879a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
@@ -120,7 +120,7 @@ public class ZlibDecompressor implements Decompressor {
   }
 
   @Override
-  public synchronized void setInput(byte[] b, int off, int len) {
+  public void setInput(byte[] b, int off, int len) {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -139,7 +139,7 @@ public class ZlibDecompressor implements Decompressor {
     uncompressedDirectBuf.position(directBufferSize);
   }
   
-  synchronized void setInputFromSavedData() {
+  void setInputFromSavedData() {
     compressedDirectBufOff = 0;
     compressedDirectBufLen = userBufLen;
     if (compressedDirectBufLen > directBufferSize) {
@@ -157,7 +157,7 @@ public class ZlibDecompressor implements Decompressor {
   }
 
   @Override
-  public synchronized void setDictionary(byte[] b, int off, int len) {
+  public void setDictionary(byte[] b, int off, int len) {
     if (stream == 0 || b == null) {
       throw new NullPointerException();
     }
@@ -169,7 +169,7 @@ public class ZlibDecompressor implements Decompressor {
   }
 
   @Override
-  public synchronized boolean needsInput() {
+  public boolean needsInput() {
     // Consume remaining compressed data?
     if (uncompressedDirectBuf.remaining() > 0) {
       return false;
@@ -189,19 +189,19 @@ public class ZlibDecompressor implements Decompressor {
   }
 
   @Override
-  public synchronized boolean needsDictionary() {
+  public boolean needsDictionary() {
     return needDict;
   }
 
   @Override
-  public synchronized boolean finished() {
+  public boolean finished() {
     // Check if 'zlib' says it's 'finished' and
     // all compressed data has been consumed
     return (finished && uncompressedDirectBuf.remaining() == 0);
   }
 
   @Override
-  public synchronized int decompress(byte[] b, int off, int len) 
+  public int decompress(byte[] b, int off, int len) 
     throws IOException {
     if (b == null) {
       throw new NullPointerException();
@@ -240,7 +240,7 @@ public class ZlibDecompressor implements Decompressor {
    *
    * @return the total (non-negative) number of uncompressed bytes output so 
far
    */
-  public synchronized long getBytesWritten() {
+  public long getBytesWritten() {
     checkStream();
     return getBytesWritten(stream);
   }
@@ -250,7 +250,7 @@ public class ZlibDecompressor implements Decompressor {
    *
    * @return the total (non-negative) number of compressed bytes input so far
    */
-  public synchronized long getBytesRead() {
+  public long getBytesRead() {
     checkStream();
     return getBytesRead(stream);
   }
@@ -263,7 +263,7 @@ public class ZlibDecompressor implements Decompressor {
    * @return the total (non-negative) number of unprocessed bytes in input
    */
   @Override
-  public synchronized int getRemaining() {
+  public int getRemaining() {
     checkStream();
     return userBufLen + getRemaining(stream);  // userBuf + compressedDirectBuf
   }
@@ -272,7 +272,7 @@ public class ZlibDecompressor implements Decompressor {
    * Resets everything including the input buffers (user and direct).</p>
    */
   @Override
-  public synchronized void reset() {
+  public void reset() {
     checkStream();
     reset(stream);
     finished = false;
@@ -284,7 +284,7 @@ public class ZlibDecompressor implements Decompressor {
   }
 
   @Override
-  public synchronized void end() {
+  public void end() {
     if (stream != 0) {
       end(stream);
       stream = 0;
@@ -372,7 +372,7 @@ public class ZlibDecompressor implements Decompressor {
     private boolean endOfInput;
 
     @Override
-    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+    public void decompress(ByteBuffer src, ByteBuffer dst)
         throws IOException {
       assert dst.isDirect() : "dst.isDirect()";
       assert src.isDirect() : "src.isDirect()";
@@ -382,13 +382,13 @@ public class ZlibDecompressor implements Decompressor {
     }
 
     @Override
-    public synchronized void setDictionary(byte[] b, int off, int len) {
+    public void setDictionary(byte[] b, int off, int len) {
       throw new UnsupportedOperationException(
           "byte[] arrays are not supported for DirectDecompressor");
     }
 
     @Override
-    public synchronized int decompress(byte[] b, int off, int len) {
+    public int decompress(byte[] b, int off, int len) {
       throw new UnsupportedOperationException(
           "byte[] arrays are not supported for DirectDecompressor");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f9ab998/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
index 551f282..5cacebf 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
@@ -19,6 +19,18 @@ package org.apache.hadoop.io.compress;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,4 +79,79 @@ public class TestCodecPool {
     assertEquals(LEASE_COUNT_ERR, 0,
         CodecPool.getLeasedDecompressorsCount(codec));
   }
+
+  @Test(timeout = 1000)
+  public void testMultiThreadedCompressorPool() throws InterruptedException {
+    final int iterations = 4;
+    ExecutorService threadpool = Executors.newFixedThreadPool(3);
+    final LinkedBlockingDeque<Compressor> queue = new 
LinkedBlockingDeque<Compressor>(
+        2 * iterations);
+
+    Callable<Boolean> consumer = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        Compressor c = queue.take();
+        CodecPool.returnCompressor(c);
+        return c != null;
+      }
+    };
+
+    Callable<Boolean> producer = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        Compressor c = CodecPool.getCompressor(codec);
+        queue.put(c);
+        return c != null;
+      }
+    };
+
+    for (int i = 0; i < iterations; i++) {
+      threadpool.submit(consumer);
+      threadpool.submit(producer);
+    }
+
+    // wait for completion
+    threadpool.shutdown();
+    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
+
+    assertEquals(LEASE_COUNT_ERR, 0, 
CodecPool.getLeasedCompressorsCount(codec));
+  }
+
+  @Test(timeout = 1000)
+  public void testMultiThreadedDecompressorPool() throws InterruptedException {
+    final int iterations = 4;
+    ExecutorService threadpool = Executors.newFixedThreadPool(3);
+    final LinkedBlockingDeque<Decompressor> queue = new 
LinkedBlockingDeque<Decompressor>(
+        2 * iterations);
+
+    Callable<Boolean> consumer = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        Decompressor dc = queue.take();
+        CodecPool.returnDecompressor(dc);
+        return dc != null;
+      }
+    };
+
+    Callable<Boolean> producer = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        Decompressor c = CodecPool.getDecompressor(codec);
+        queue.put(c);
+        return c != null;
+      }
+    };
+
+    for (int i = 0; i < iterations; i++) {
+      threadpool.submit(consumer);
+      threadpool.submit(producer);
+    }
+
+    // wait for completion
+    threadpool.shutdown();
+    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
+
+    assertEquals(LEASE_COUNT_ERR, 0,
+        CodecPool.getLeasedDecompressorsCount(codec));
+  }
 }

Reply via email to