Repository: kafka Updated Branches: refs/heads/trunk f892f0ca6 -> 1182d61de
KAFKA-3704: Remove hard-coded block size in KafkaProducer Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma Closes #1371 from guozhangwang/K3565-remove-compression-blocksize Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1182d61d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1182d61d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1182d61d Branch: refs/heads/trunk Commit: 1182d61deb23b5cd86cbe462471f7df583a796e1 Parents: f892f0c Author: Guozhang Wang <[email protected]> Authored: Wed May 11 17:01:14 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 11 17:01:14 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/record/Compressor.java | 17 ++++++----------- .../org/apache/kafka/common/record/Record.java | 2 +- docs/upgrade.html | 1 + 3 files changed, 8 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1182d61d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 37d53b8..60c15e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -33,7 +33,6 @@ public class Compressor { static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; - static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; private static final float[] TYPE_TO_RATE; @@ -53,7 +52,7 @@ public class Compressor { @Override public Constructor get() throws ClassNotFoundException, NoSuchMethodException { return Class.forName("org.xerial.snappy.SnappyOutputStream") - .getConstructor(OutputStream.class, Integer.TYPE); + .getConstructor(OutputStream.class); } }); @@ -91,7 +90,7 @@ public class Compressor { public float compressionRate; public long maxTimestamp; - public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) { + public Compressor(ByteBuffer buffer, CompressionType type) { this.type = type; this.initPos = buffer.position(); @@ -108,11 +107,7 @@ public class Compressor { // create the stream bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, type, blockSize); - } - - public Compressor(ByteBuffer buffer, CompressionType type) { - this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE); + appendStream = wrapForOutput(bufferStream, type); } public ByteBuffer buffer() { @@ -246,16 +241,16 @@ public class Compressor { // the following two functions also need to be public since they are used in MemoryRecords.iteration - static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { + static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) { try { switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: - return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); + return new DataOutputStream(new GZIPOutputStream(buffer)); case SNAPPY: try { - OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); + OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); http://git-wip-us.apache.org/repos/asf/kafka/blob/1182d61d/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 147ad86..baab9ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -146,7 +146,7 @@ public final class Record { public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { // construct the compressor with compression type none since this function will not do any //compression according to the input type, it will just write the record's payload as is - Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity()); + Compressor compressor = new Compressor(buffer, CompressionType.NONE); compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1182d61d/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 4b8ec7e..3c98540 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -91,6 +91,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9 <ul> <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <a href="#streams_overview">this section</a>.</li> + <li> If compression with snappy or gzip is enabled, the new producer will use the compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB in order to improve the compression ratio. Note that the default buffer sizes for snappy, gzip and LZ4 are 0.5 KB, 32 KB and 64KB respectively. For the snappy case, a producer with 5000 partitions will require an additional 155 MB of JVM heap.</li> <li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li> <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li> <li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>
