[ https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma resolved KAFKA-6430. -------------------------------- Resolution: Fixed Fix Version/s: 1.1.0 > Improve Kafka GZip compression performance > ------------------------------------------ > > Key: KAFKA-6430 > URL: https://issues.apache.org/jira/browse/KAFKA-6430 > Project: Kafka > Issue Type: Improvement > Components: clients, compression, core > Reporter: Ying Zheng > Assignee: Ying Zheng > Priority: Minor > Fix For: 1.1.0 > > > To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream: > new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); > To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream: > new DataInputStream(new GZIPInputStream(buffer)); > This is very straight forward, but actually inefficient. For each message, in > addition to the key and value data, Kafka has to write about 30 some metadata > bytes (slightly varies in different Kafka version), including magic byte, > checksum, timestamp, offset, key length, value length etc. For each of these > bytes, java DataOutputStream has to call write(byte) once. Here is the > awkward writeInt() method in DataOutputStream, which writes 4 bytes > separately in big-endian order. > {code} > public final void writeInt(int v) throws IOException { > out.write((v >>> 24) & 0xFF); > out.write((v >>> 16) & 0xFF); > out.write((v >>> 8) & 0xFF); > out.write((v >>> 0) & 0xFF); > incCount(4); > } > {code} > Unfortunately, GZIPOutputStream does not implement the write(byte) method. > Instead, it only provides a write(byte[], offset, len) method, which calls > the corresponding JNI zlib function. The write(byte) calls from > DataOutputStream are translated into write(byte[], offset, len) calls in a > very inefficient way: (Oracle JDK 1.8 code) > {code} > class DeflaterOutputStream { > public void write(int b) throws IOException { > byte[] buf = new byte[1]; > buf[0] = (byte)(b & 0xff); > write(buf, 0, 1); > } > public void write(byte[] b, int off, int len) throws IOException { > if (def.finished()) { > throw new IOException("write beyond end of stream"); > } > if ((off | len | (off + len) | (b.length - (off + len))) < 0) { > throw new IndexOutOfBoundsException(); > } else if (len == 0) { > return; > } > if (!def.finished()) { > def.setInput(b, off, len); > while (!def.needsInput()) { > deflate(); > } > } > } > } > class GZIPOutputStream extends DeflaterOutputStream { > public synchronized void write(byte[] buf, int off, int len) > throws IOException > { > super.write(buf, off, len); > crc.update(buf, off, len); > } > } > class Deflater { > private native int deflateBytes(long addr, byte[] b, int off, int len, int > flush); > } > class CRC32 { > public void update(byte[] b, int off, int len) { > if (b == null) { > throw new NullPointerException(); > } > if (off < 0 || len < 0 || off > b.length - len) { > throw new ArrayIndexOutOfBoundsException(); > } > crc = updateBytes(crc, b, off, len); > } > private native static int updateBytes(int crc, byte[] b, int off, int > len); > } > {code} > For each meta data byte, the code above has to allocate 1 single byte array, > acquire several locks, call two native JNI methods (Deflater.deflateBytes and > CRC32.updateBytes). In each Kafka message, there are about 30 some meta data > bytes. > The call stack of Deflater.deflateBytes(): > DeflaterOutputStream.public void write(int b) -> > GZIPOutputStream.write(byte[] buf, int off, int len) -> > DeflaterOutputStream.write(byte[] b, int off, int len) -> > DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int > len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> > Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush) > The call stack of CRC32.updateBytes(): > DeflaterOutputStream.public void write(int b) -> > GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] > b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len) > At Uber, we found that adding a small buffer between DataOutputStream and > GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in > average. > {code} > - return new DataOutputStream(new > GZIPOutputStream(buffer, bufferSize)); > + return new DataOutputStream(new BufferedOutputStream(new > GZIPOutputStream(buffer, bufferSize), 1 << 14)); > {code} > The similar issue also exist in GZip decompression, which can be fixed by > adding a buffer with BufferedInputStream. > We have tested this improvement on Kafka 10.2 / Oracle JDK 8, with the > production traffic at Uber: > || Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || > Kafka /w GZip Buffer Throughput (MB/s) || Speed Up|| > | topic 1 | 197 | 10.9 | 21.9 | 2.0 | > | topic 2 | 208 | 8.5 | 15.9 | 1.9 | > | topic 3 | 624 | 15.3 | 20.2 | 1.3 | > | topic 4 | 766 | 28.0 | 43.7 | 1.6 | > | topic 5 | 1168 | 22.9 | 25.4 | 1.1 | > | topic 6 | 165021 | 9.1 | 9.2 | 1.0 | -- This message was sent by Atlassian JIRA (v7.6.3#76005)