[jira] [Created] (KAFKA-7975) Provide client API version to authorizer
Ying Zheng created KAFKA-7975: - Summary: Provide client API version to authorizer Key: KAFKA-7975 URL: https://issues.apache.org/jira/browse/KAFKA-7975 Project: Kafka Issue Type: Improvement Components: core Reporter: Ying Zheng Assignee: Ying Zheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
Ying Zheng created KAFKA-7142: - Summary: Rebalancing large consumer group can block the coordinator broker for several seconds Key: KAFKA-7142 URL: https://issues.apache.org/jira/browse/KAFKA-7142 Project: Kafka Issue Type: Improvement Reporter: Ying Zheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6432) Lookup indices may cause unnecessary page fault
Ying Zheng created KAFKA-6432: - Summary: Lookup indices may cause unnecessary page fault Key: KAFKA-6432 URL: https://issues.apache.org/jira/browse/KAFKA-6432 Project: Kafka Issue Type: Improvement Components: core, log Reporter: Ying Zheng Attachments: Binary Search - Diagram 1.png, Binary Search - Diagram 2.png For each topic-partition, Kafka broker maintains two indices: one for message offset, one for message timestamp. By default, a new index entry is appended to each index for every 4KB messages. The lookup of the indices is a simple binary search. The indices are mmaped files, and cached by Linux page cache. Both consumer fetch and follower fetch have to do an offset lookup, before accessing the actual message data. The simple binary search algorithm used for looking up the index is not cache friendly, and may cause page faults even on high QPS topic-partitions. For example (diagram 1), when looking up an index entry in page 12, the binary search algorithm has to read page 0, 6, 9 and 11. After new messages are appended to the topic-partition, the index grows to 13 pages. Now, if the follower fetch request looking up the 1st index entry of page 13, the binary search algorithm will go to page 0, 7, 10 and 12. Among those pages, page 7 and 10 have not been used for a long time, and may already be swapped to hard disk. Actually, in a normal Kafka broker, all the follower fetch requests and most consumer fetch requests should only look up the last few entries of the index. We can make the index lookup more cache friendly, by searching in the last one or two pages of the index first. (Diagram 2) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6431) Lock contention in Purgatory
Ying Zheng created KAFKA-6431: - Summary: Lock contention in Purgatory Key: KAFKA-6431 URL: https://issues.apache.org/jira/browse/KAFKA-6431 Project: Kafka Issue Type: Improvement Components: core Reporter: Ying Zheng Priority: Minor Purgatory is the data structure in Kafka broker that manages delayed operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key to the operations (in a ConcurrentLinkedQueue) that are interested in the key. When an operation is done or expired, it's removed from the list (ConcurrentLinkedQueue). When the list is empty, it's removed from the ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid adding new operations into a list that is being removed. This is currently done by a globally shared ReentrantReadWriteLock. All the read operations on purgatory have to acquire the read permission of this lock. The list removing operations needs the write permission of this lock. Our profiling result shows that Kafka broker is spending a nontrivial time on this read write lock. The problem is exacerbated when there are a large amount of short operations. For example, when we are doing sync produce operations (acks=all), a DelayedProduce operation is added and then removed for each message. If the QPS of the topic is not high, it's very likely that, when the operation is done and removed, the list of that key (topic partitions) becomes empty, and has to be removed when holding the write lock. This operation blocks all the read / write operations on purgatory for awhile. As there are tens of IO threads accessing purgatory concurrently, this shared lock can easily become a bottleneck. Actually, we only want to avoid concurrent read / write on the same key. The operations on different keys do not conflict with each other. I suggest to shard purgatory into smaller partitions, and lock each individual partition independently. Assuming there are 10 io threads actively accessing purgatory, sharding purgatory into 512 partitions will make the probability for 2 threads accessing the same partition at the same time to about 2%. We also can use ReentrantLock instead of ReentrantReadWriteLock. When the read operations are not much more than write operations, ReentrantLock has lower overhead than ReentrantReadWriteLock. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6430) Improve Kafka GZip compression performance
Ying Zheng created KAFKA-6430: - Summary: Improve Kafka GZip compression performance Key: KAFKA-6430 URL: https://issues.apache.org/jira/browse/KAFKA-6430 Project: Kafka Issue Type: Improvement Components: clients, core Reporter: Ying Zheng Priority: Minor To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream: new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream: new DataInputStream(new GZIPInputStream(buffer)); This is very straight forward, but actually inefficient. For each message, in addition to the key and value data, Kafka has to write about 30 some metadata bytes (slightly varies in different Kafka version), including magic byte, checksum, timestamp, offset, key length, value length etc. For each of these bytes, java DataOutputStream has to call write(byte) once. Here is the awkward writeInt() method in DataInputStream, which writes 4 bytes separately in big-endian order. public final void writeInt(int v) throws IOException { out.write((v >>> 24) & 0xFF); out.write((v >>> 16) & 0xFF); out.write((v >>> 8) & 0xFF); out.write((v >>> 0) & 0xFF); incCount(4); } Unfortunately, GZIPOutputStream does not implement the write(byte) method. Instead, it only provides a write(byte[], offset, len) method, which calls the corresponding JNI zlib function. The write(byte) calls from DataOutputStream are translated into write(byte[], offset, len) calls in a very inefficient way: (Oracle JDK 1.8 code) class DeflaterOutputStream { public void write(int b) throws IOException { byte[] buf = new byte[1]; buf[0] = (byte)(b & 0xff); write(buf, 0, 1); } public void write(byte[] b, int off, int len) throws IOException { if (def.finished()) { throw new IOException("write beyond end of stream"); } if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } if (!def.finished()) { def.setInput(b, off, len); while (!def.needsInput()) { deflate(); } } } } class GZIPOutputStream extends DeflaterOutputStream { public synchronized void write(byte[] buf, int off, int len) throws IOException { super.write(buf, off, len); crc.update(buf, off, len); } } class Deflater { private native int deflateBytes(long addr, byte[] b, int off, int len, int flush); } class CRC32 { public void update(byte[] b, int off, int len) { if (b == null) { throw new NullPointerException(); } if (off < 0 || len < 0 || off > b.length - len) { throw new ArrayIndexOutOfBoundsException(); } crc = updateBytes(crc, b, off, len); } private native static int updateBytes(int crc, byte[] b, int off, int len); } For each meta data byte, the code above has to allocate 1 single byte array, acquire several locks, call two native JNI methods (Deflater.deflateBytes and CRC32.updateBytes). In each Kafka message, there are about 30 some meta data bytes. The call stack of Deflater.deflateBytes(): DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush) The call stack of CRC32.updateBytes(): DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len) At Uber, we found that adding a small buffer between DataOutputStream and GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in average. -return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); +return new DataOutputStream(new BufferedOutputStream(new GZIPOutputStream(buffer, bufferSize), 1 << 14)); The similar fix also applies to GZip decompression. Here is the test result using the production traffic at Uber: || Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || Kafka /w GZip Buffer Throughput (MB/s) || Speed Up|| | topic 1 | 197 | 10.9 | 21.9 | 2.0 | | topic 2 | 208 | 8.5 | 15.9 | 1.9 | | topic 3 | 624 | 15.3 | 20.2 | 1.3 | | topic 4 | 766 | 28.0 | 43.7 | 1.6 | | topic 5 | 1168 | 22.9 | 25.4 | 1.1 | | topic 6 | 165021 | 9.1 | 9.2 | 1.0 | -- This message was sent by