Repository: kafka
Updated Branches:
  refs/heads/trunk c18a1bd64 -> 4a0e011be


KAFKA-2882: Add constructor cache for Snappy and LZ4 Output/Input streams in 
Compressor.java

In `wrapForOutput` and `wrapForInput` methods of 
`org.apache.kafka.common.record.Compressor`,  `Class.forName("[compression 
codec]")` and `getConstructor` methods are invoked for each `wrapForOutput` / 
`wrapForInput` call. Reflection calls are expensive and impact performance at 
high volumes. This patch adds a cache for `Constructor` to reduce the 
reflection overhead.

In our production deployments, this has reduced producer CPU usage by about 20%

Author: Maksim Logvinenko <[email protected]>

Reviewers: Ismael Juma

Closes #580 from logarithm/compressor-getclass-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a0e011b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a0e011b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a0e011b

Branch: refs/heads/trunk
Commit: 4a0e011be3d038763d6326bb0092524f809c3f4d
Parents: c18a1bd
Author: Maksim Logvinenko <[email protected]>
Authored: Thu Nov 26 22:18:21 2015 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Thu Nov 26 22:18:21 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/record/Compressor.java  | 87 ++++++++++++++++----
 1 file changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a0e011b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 27f757a..1aee389 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import java.lang.reflect.Constructor;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Utils;
 
@@ -46,6 +47,40 @@ public class Compressor {
         }
     }
 
+    // dynamically load the snappy and lz4 classes to avoid runtime dependency 
if we are not using compression
+    // caching constructors to avoid invoking of Class.forName method for each 
batch
+    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = 
new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, 
NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyOutputStream")
+                .getConstructor(OutputStream.class, Integer.TYPE);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new 
MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, 
NoSuchMethodException {
+            return 
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+                .getConstructor(OutputStream.class);
+        }
+    });
+
+    private static MemoizingConstructorSupplier snappyInputStreamSupplier = 
new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, 
NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyInputStream")
+                .getConstructor(InputStream.class);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4InputStreamSupplier = new 
MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, 
NoSuchMethodException {
+            return 
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+                .getConstructor(InputStream.class);
+        }
+    });
+
     private final CompressionType type;
     private final DataOutputStream appendStream;
     private final ByteBufferOutputStream bufferStream;
@@ -79,7 +114,7 @@ public class Compressor {
     public ByteBuffer buffer() {
         return bufferStream.buffer();
     }
-    
+
     public double compressionRate() {
         ByteBuffer buffer = bufferStream.buffer();
         if (this.writtenUncompressed == 0)
@@ -209,21 +244,15 @@ public class Compressor {
                 case GZIP:
                     return new DataOutputStream(new GZIPOutputStream(buffer, 
bufferSize));
                 case SNAPPY:
-                    // dynamically load the snappy class to avoid runtime 
dependency
-                    // on snappy if we are not using it
                     try {
-                        Class<?> outputStreamClass = 
Class.forName("org.xerial.snappy.SnappyOutputStream");
-                        OutputStream stream = (OutputStream) 
outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
-                            .newInstance(buffer, bufferSize);
+                        OutputStream stream = (OutputStream) 
snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
                 case LZ4:
                     try {
-                        Class<?> outputStreamClass = 
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
-                        OutputStream stream = (OutputStream) 
outputStreamClass.getConstructor(OutputStream.class)
-                            .newInstance(buffer);
+                        OutputStream stream = (OutputStream) 
lz4OutputStreamSupplier.get().newInstance(buffer);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
@@ -244,22 +273,15 @@ public class Compressor {
                 case GZIP:
                     return new DataInputStream(new GZIPInputStream(buffer));
                 case SNAPPY:
-                    // dynamically load the snappy class to avoid runtime 
dependency
-                    // on snappy if we are not using it
                     try {
-                        Class<?> inputStreamClass = 
Class.forName("org.xerial.snappy.SnappyInputStream");
-                        InputStream stream = (InputStream) 
inputStreamClass.getConstructor(InputStream.class)
-                            .newInstance(buffer);
+                        InputStream stream = (InputStream) 
snappyInputStreamSupplier.get().newInstance(buffer);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
                 case LZ4:
-                    // dynamically load LZ4 class to avoid runtime dependency
                     try {
-                        Class<?> inputStreamClass = 
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
-                        InputStream stream = (InputStream) 
inputStreamClass.getConstructor(InputStream.class)
-                            .newInstance(buffer);
+                        InputStream stream = (InputStream) 
lz4InputStreamSupplier.get().newInstance(buffer);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
@@ -271,4 +293,33 @@ public class Compressor {
             throw new KafkaException(e);
         }
     }
+
+    private interface ConstructorSupplier {
+        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+    }
+
+    // this code is based on Guava's 
@see{com.google.common.base.Suppliers.MemoizingSupplier}
+    private static class MemoizingConstructorSupplier {
+        final ConstructorSupplier delegate;
+        transient volatile boolean initialized;
+        transient Constructor value;
+
+        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+            this.delegate = delegate;
+        }
+
+        public Constructor get() throws NoSuchMethodException, 
ClassNotFoundException {
+            if (!initialized) {
+                synchronized (this) {
+                    if (!initialized) {
+                        Constructor constructor = delegate.get();
+                        value = constructor;
+                        initialized = true;
+                        return constructor;
+                    }
+                }
+            }
+            return value;
+        }
+    }
 }

Reply via email to