Repository: kafka
Updated Branches:
  refs/heads/0.10.0 dca78b586 -> 05640c86e


KAFKA-3704: Remove hard-coded block size in KafkaProducer

Author: Guozhang Wang <[email protected]>

Reviewers: Ismael Juma

Closes #1371 from guozhangwang/K3565-remove-compression-blocksize

(cherry picked from commit 1182d61deb23b5cd86cbe462471f7df583a796e1)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05640c86
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05640c86
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05640c86

Branch: refs/heads/0.10.0
Commit: 05640c86eebc673f110604e0c7b3fa3315dfcd7b
Parents: dca78b5
Author: Guozhang Wang <[email protected]>
Authored: Wed May 11 17:01:14 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed May 11 17:01:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Compressor.java | 17 ++++++-----------
 .../org/apache/kafka/common/record/Record.java     |  2 +-
 docs/upgrade.html                                  |  1 +
 3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 37d53b8..60c15e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -33,7 +33,6 @@ public class Compressor {
 
     static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
     static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
-    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private static final float[] TYPE_TO_RATE;
 
@@ -53,7 +52,7 @@ public class Compressor {
         @Override
         public Constructor get() throws ClassNotFoundException, 
NoSuchMethodException {
             return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class, Integer.TYPE);
+                .getConstructor(OutputStream.class);
         }
     });
 
@@ -91,7 +90,7 @@ public class Compressor {
     public float compressionRate;
     public long maxTimestamp;
 
-    public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
+    public Compressor(ByteBuffer buffer, CompressionType type) {
         this.type = type;
         this.initPos = buffer.position();
 
@@ -108,11 +107,7 @@ public class Compressor {
 
         // create the stream
         bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, type, blockSize);
-    }
-
-    public Compressor(ByteBuffer buffer, CompressionType type) {
-        this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
+        appendStream = wrapForOutput(bufferStream, type);
     }
 
     public ByteBuffer buffer() {
@@ -246,16 +241,16 @@ public class Compressor {
 
     // the following two functions also need to be public since they are used 
in MemoryRecords.iteration
 
-    static public DataOutputStream wrapForOutput(ByteBufferOutputStream 
buffer, CompressionType type, int bufferSize) {
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream 
buffer, CompressionType type) {
         try {
             switch (type) {
                 case NONE:
                     return new DataOutputStream(buffer);
                 case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer, 
bufferSize));
+                    return new DataOutputStream(new GZIPOutputStream(buffer));
                 case SNAPPY:
                     try {
-                        OutputStream stream = (OutputStream) 
snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
+                        OutputStream stream = (OutputStream) 
snappyOutputStreamSupplier.get().newInstance(buffer);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java 
b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 147ad86..baab9ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -146,7 +146,7 @@ public final class Record {
     public static void write(ByteBuffer buffer, long timestamp, byte[] key, 
byte[] value, CompressionType type, int valueOffset, int valueSize) {
         // construct the compressor with compression type none since this 
function will not do any
         //compression according to the input type, it will just write the 
record's payload as is
-        Compressor compressor = new Compressor(buffer, CompressionType.NONE, 
buffer.capacity());
+        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
         compressor.putRecord(timestamp, key, value, type, valueOffset, 
valueSize);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/05640c86/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4b8ec7e..3c98540 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -91,6 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should 
be upgraded to 0.9
 
 <ul>
     <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka 
Streams</b> is available for stream processing on data stored in Kafka topics. 
This new client library only works with 0.10.x and upward versioned brokers due 
to message format changes mentioned above. For more information please read <a 
href="#streams_overview">this section</a>.</li>
+    <li> If compression with snappy or gzip is enabled, the new producer will 
use the compression scheme's default buffer size (this is already the case for 
LZ4) instead of 1 KB in order to improve the compression ratio. Note that the 
default buffer sizes for snappy, gzip and LZ4 are 0.5 KB, 32 KB and 64KB 
respectively. For the snappy case, a producer with 5000 partitions will require 
an additional 155 MB of JVM heap.</li>
     <li> The default value of the configuration parameter 
<code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter 
<code>exclude.internal.topics</code> to restrict internal topics (such as the 
consumer offsets topic) from accidentally being included in regular expression 
subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate 
their code to the Java producer included in the kafka-clients JAR as soon as 
possible. </li>

Reply via email to